rbd: Add rados namespace support for rbd

Make sure to operate within the namespace if any given
when dealing with rbd images and snapshots and their journals.

Signed-off-by: Mehdy Khoshnoody <mehdy.khoshnoody@gmail.com>
This commit is contained in:
Mehdy Khoshnoody 2020-06-01 18:27:51 +04:30 committed by Humble Devassy Chirammal
parent b5320d9273
commit fc5eadf106
12 changed files with 125 additions and 58 deletions

View File

@ -65,7 +65,8 @@ func checkVolExists(ctx context.Context,
sID *snapshotIdentifier, sID *snapshotIdentifier,
cr *util.Credentials) (*volumeIdentifier, error) { cr *util.Credentials) (*volumeIdentifier, error) {
var vid volumeIdentifier var vid volumeIdentifier
j, err := volJournal.Connect(volOptions.Monitors, cr) // Connect to cephfs' default radosNamespace (csi)
j, err := volJournal.Connect(volOptions.Monitors, radosNamespace, cr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -174,7 +175,8 @@ func undoVolReservation(ctx context.Context, volOptions *volumeOptions, vid volu
} }
defer cr.DeleteCredentials() defer cr.DeleteCredentials()
j, err := volJournal.Connect(volOptions.Monitors, cr) // Connect to cephfs' default radosNamespace (csi)
j, err := volJournal.Connect(volOptions.Monitors, radosNamespace, cr)
if err != nil { if err != nil {
return err return err
} }
@ -220,7 +222,8 @@ func reserveVol(ctx context.Context, volOptions *volumeOptions, secret map[strin
return nil, err return nil, err
} }
j, err := volJournal.Connect(volOptions.Monitors, cr) // Connect to cephfs' default radosNamespace (csi)
j, err := volJournal.Connect(volOptions.Monitors, radosNamespace, cr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -256,7 +259,8 @@ func reserveSnap(ctx context.Context, volOptions *volumeOptions, parentSubVolNam
err error err error
) )
j, err := snapJournal.Connect(volOptions.Monitors, cr) // Connect to cephfs' default radosNamespace (csi)
j, err := snapJournal.Connect(volOptions.Monitors, radosNamespace, cr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -285,7 +289,8 @@ func reserveSnap(ctx context.Context, volOptions *volumeOptions, parentSubVolNam
// undoSnapReservation is a helper routine to undo a name reservation for a CSI SnapshotName. // undoSnapReservation is a helper routine to undo a name reservation for a CSI SnapshotName.
func undoSnapReservation(ctx context.Context, volOptions *volumeOptions, vid snapshotIdentifier, snapName string, cr *util.Credentials) error { func undoSnapReservation(ctx context.Context, volOptions *volumeOptions, vid snapshotIdentifier, snapName string, cr *util.Credentials) error {
j, err := snapJournal.Connect(volOptions.Monitors, cr) // Connect to cephfs' default radosNamespace (csi)
j, err := snapJournal.Connect(volOptions.Monitors, radosNamespace, cr)
if err != nil { if err != nil {
return err return err
} }
@ -316,7 +321,8 @@ func checkSnapExists(
parentSubVolName string, parentSubVolName string,
snap *cephfsSnapshot, snap *cephfsSnapshot,
cr *util.Credentials) (*snapshotIdentifier, *snapshotInfo, error) { cr *util.Credentials) (*snapshotIdentifier, *snapshotInfo, error) {
j, err := snapJournal.Connect(volOptions.Monitors, cr) // Connect to cephfs' default radosNamespace (csi)
j, err := snapJournal.Connect(volOptions.Monitors, radosNamespace, cr)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -261,7 +261,8 @@ func newVolumeOptionsFromVolID(ctx context.Context, volID string, volOpt, secret
return nil, nil, err return nil, nil, err
} }
j, err := volJournal.Connect(volOptions.Monitors, cr) // Connect to cephfs' default radosNamespace (csi)
j, err := volJournal.Connect(volOptions.Monitors, radosNamespace, cr)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -448,7 +449,8 @@ func newSnapshotOptionsFromID(ctx context.Context, snapID string, cr *util.Crede
return &volOptions, nil, &sid, err return &volOptions, nil, &sid, err
} }
j, err := snapJournal.Connect(volOptions.Monitors, cr) // Connect to cephfs' default radosNamespace (csi)
j, err := snapJournal.Connect(volOptions.Monitors, radosNamespace, cr)
if err != nil { if err != nil {
return &volOptions, nil, &sid, err return &volOptions, nil, &sid, err
} }

View File

@ -235,7 +235,8 @@ type Connection struct {
} }
// Connect establishes a new connection to a ceph cluster for journal metadata. // Connect establishes a new connection to a ceph cluster for journal metadata.
func (cj *Config) Connect(monitors string, cr *util.Credentials) (*Connection, error) { func (cj *Config) Connect(monitors, namespace string, cr *util.Credentials) (*Connection, error) {
cj.namespace = namespace
cc := &util.ClusterConnection{} cc := &util.ClusterConnection{}
if err := cc.Connect(monitors, cr); err != nil { if err := cc.Connect(monitors, cr); err != nil {
return nil, err return nil, err

View File

@ -164,7 +164,7 @@ func (rv *rbdVolume) createCloneFromImage(ctx context.Context, parentVol *rbdVol
) )
var j = &journal.Connection{} var j = &journal.Connection{}
j, err = volJournal.Connect(rv.Monitors, rv.conn.Creds) j, err = volJournal.Connect(rv.Monitors, rv.RadosNamespace, rv.conn.Creds)
if err != nil { if err != nil {
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }

View File

@ -75,6 +75,9 @@ func (cs *ControllerServer) validateVolumeReq(ctx context.Context, req *csi.Crea
if value, ok := options["dataPool"]; ok && value == "" { if value, ok := options["dataPool"]; ok && value == "" {
return status.Error(codes.InvalidArgument, "empty datapool name to provision volume from") return status.Error(codes.InvalidArgument, "empty datapool name to provision volume from")
} }
if value, ok := options["raodsNamespace"]; ok && value == "" {
return status.Error(codes.InvalidArgument, "empty namespace name to provision volume from")
}
if value, ok := options["volumeNamePrefix"]; ok && value == "" { if value, ok := options["volumeNamePrefix"]; ok && value == "" {
return status.Error(codes.InvalidArgument, "empty volume name prefix to provision volume from") return status.Error(codes.InvalidArgument, "empty volume name prefix to provision volume from")
} }
@ -146,6 +149,9 @@ func buildCreateVolumeResponse(ctx context.Context, req *csi.CreateVolumeRequest
volumeContext["pool"] = rbdVol.Pool volumeContext["pool"] = rbdVol.Pool
volumeContext["journalPool"] = rbdVol.JournalPool volumeContext["journalPool"] = rbdVol.JournalPool
volumeContext["imageName"] = rbdVol.RbdImageName volumeContext["imageName"] = rbdVol.RbdImageName
if rbdVol.RadosNamespace != "" {
volumeContext["radosNamespace"] = rbdVol.RadosNamespace
}
volume := &csi.Volume{ volume := &csi.Volume{
VolumeId: rbdVol.VolID, VolumeId: rbdVol.VolID,
CapacityBytes: rbdVol.VolSize, CapacityBytes: rbdVol.VolSize,
@ -291,6 +297,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
volumeContext := req.GetParameters() volumeContext := req.GetParameters()
volumeContext["pool"] = rbdVol.Pool volumeContext["pool"] = rbdVol.Pool
volumeContext["journalPool"] = rbdVol.JournalPool volumeContext["journalPool"] = rbdVol.JournalPool
volumeContext["radosNamespace"] = rbdVol.RadosNamespace
volumeContext["imageName"] = rbdVol.RbdImageName volumeContext["imageName"] = rbdVol.RbdImageName
volume := &csi.Volume{ volume := &csi.Volume{
VolumeId: rbdVol.VolID, VolumeId: rbdVol.VolID,
@ -409,7 +416,7 @@ func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Cre
var err error var err error
var j = &journal.Connection{} var j = &journal.Connection{}
j, err = volJournal.Connect(rbdVol.Monitors, cr) j, err = volJournal.Connect(rbdVol.Monitors, rbdVol.RadosNamespace, cr)
if err != nil { if err != nil {
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
@ -906,7 +913,7 @@ func (cs *ControllerServer) doSnapshotClone(ctx context.Context, parentVol *rbdV
} }
var j = &journal.Connection{} var j = &journal.Connection{}
// save image ID // save image ID
j, err = snapJournal.Connect(rbdSnap.Monitors, cr) j, err = snapJournal.Connect(rbdSnap.Monitors, rbdSnap.RadosNamespace, cr)
if err != nil { if err != nil {
klog.Errorf(util.Log(ctx, "failed to connect to cluster: %v"), err) klog.Errorf(util.Log(ctx, "failed to connect to cluster: %v"), err)
return ready, cloneRbd, err return ready, cloneRbd, err

View File

@ -19,7 +19,7 @@ package rbd
import "errors" import "errors"
var ( var (
// ErrImageNotFound is returned when image name is not found in the cluster on the given pool. // ErrImageNotFound is returned when image name is not found in the cluster on the given pool and/or namespace.
ErrImageNotFound = errors.New("image not found") ErrImageNotFound = errors.New("image not found")
// ErrSnapNotFound is returned when snap name passed is not found in the list of snapshots for the // ErrSnapNotFound is returned when snap name passed is not found in the list of snapshots for the
// given image. // given image.

View File

@ -187,7 +187,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
j, err2 := volJournal.Connect(volOptions.Monitors, cr) j, err2 := volJournal.Connect(volOptions.Monitors, volOptions.RadosNamespace, cr)
if err2 != nil { if err2 != nil {
klog.Errorf( klog.Errorf(
util.Log(ctx, "failed to establish cluster connection: %v"), util.Log(ctx, "failed to establish cluster connection: %v"),
@ -751,7 +751,7 @@ func getDevicePath(ctx context.Context, volumePath string) (string, error) {
if err != nil { if err != nil {
klog.Errorf(util.Log(ctx, "failed to find image metadata: %v"), err) klog.Errorf(util.Log(ctx, "failed to find image metadata: %v"), err)
} }
device, found := findDeviceMappingImage(ctx, imgInfo.Pool, imgInfo.ImageName, imgInfo.NbdAccess) device, found := findDeviceMappingImage(ctx, imgInfo.Pool, imgInfo.RadosNamespace, imgInfo.ImageName, imgInfo.NbdAccess)
if found { if found {
return device, nil return device, nil
} }

View File

@ -60,6 +60,7 @@ func init() {
type rbdDeviceInfo struct { type rbdDeviceInfo struct {
ID string `json:"id"` ID string `json:"id"`
Pool string `json:"pool"` Pool string `json:"pool"`
RadosNamespace string `json:"radosNamespace"`
Name string `json:"name"` Name string `json:"name"`
Device string `json:"device"` Device string `json:"device"`
} }
@ -71,6 +72,7 @@ type rbdDeviceInfo struct {
type nbdDeviceInfo struct { type nbdDeviceInfo struct {
ID int64 `json:"id"` ID int64 `json:"id"`
Pool string `json:"pool"` Pool string `json:"pool"`
RadosNamespace string `json:"radosNamespace"`
Name string `json:"image"` Name string `json:"image"`
Device string `json:"device"` Device string `json:"device"`
} }
@ -106,6 +108,7 @@ func rbdGetDeviceList(ctx context.Context, accessType string) ([]rbdDeviceInfo,
rbdDeviceInfo{ rbdDeviceInfo{
ID: strconv.FormatInt(device.ID, 10), ID: strconv.FormatInt(device.ID, 10),
Pool: device.Pool, Pool: device.Pool,
RadosNamespace: device.RadosNamespace,
Name: device.Name, Name: device.Name,
Device: device.Device, Device: device.Device,
}) })
@ -115,21 +118,26 @@ func rbdGetDeviceList(ctx context.Context, accessType string) ([]rbdDeviceInfo,
return rbdDeviceList, nil return rbdDeviceList, nil
} }
// findDeviceMappingImage finds a devicePath, if available, based on image spec (pool/image) on the node. // findDeviceMappingImage finds a devicePath, if available, based on image spec (pool/{namespace/}image) on the node.
func findDeviceMappingImage(ctx context.Context, pool, image string, useNbdDriver bool) (string, bool) { func findDeviceMappingImage(ctx context.Context, pool, namespace, image string, useNbdDriver bool) (string, bool) {
accessType := accessTypeKRbd accessType := accessTypeKRbd
if useNbdDriver { if useNbdDriver {
accessType = accessTypeNbd accessType = accessTypeNbd
} }
imageSpec := fmt.Sprintf("%s/%s", pool, image)
if namespace != "" {
imageSpec = fmt.Sprintf("%s/%s/%s", pool, namespace, image)
}
rbdDeviceList, err := rbdGetDeviceList(ctx, accessType) rbdDeviceList, err := rbdGetDeviceList(ctx, accessType)
if err != nil { if err != nil {
klog.Warningf(util.Log(ctx, "failed to determine if image (%s/%s) is mapped to a device (%v)"), pool, image, err) klog.Warningf(util.Log(ctx, "failed to determine if image (%s) is mapped to a device (%v)"), imageSpec, err)
return "", false return "", false
} }
for _, device := range rbdDeviceList { for _, device := range rbdDeviceList {
if device.Name == image && device.Pool == pool { if device.Name == image && device.Pool == pool && device.RadosNamespace == namespace {
return device.Device, true return device.Device, true
} }
} }
@ -138,13 +146,13 @@ func findDeviceMappingImage(ctx context.Context, pool, image string, useNbdDrive
} }
// Stat a path, if it doesn't exist, retry maxRetries times. // Stat a path, if it doesn't exist, retry maxRetries times.
func waitForPath(ctx context.Context, pool, image string, maxRetries int, useNbdDriver bool) (string, bool) { func waitForPath(ctx context.Context, pool, namespace, image string, maxRetries int, useNbdDriver bool) (string, bool) {
for i := 0; i < maxRetries; i++ { for i := 0; i < maxRetries; i++ {
if i != 0 { if i != 0 {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
device, found := findDeviceMappingImage(ctx, pool, image, useNbdDriver) device, found := findDeviceMappingImage(ctx, pool, namespace, image, useNbdDriver)
if found { if found {
return device, found return device, found
} }
@ -182,7 +190,7 @@ func attachRBDImage(ctx context.Context, volOptions *rbdVolume, cr *util.Credent
useNBD = true useNBD = true
} }
devicePath, found := waitForPath(ctx, volOptions.Pool, image, 1, useNBD) devicePath, found := waitForPath(ctx, volOptions.Pool, volOptions.RadosNamespace, image, 1, useNBD)
if !found { if !found {
backoff := wait.Backoff{ backoff := wait.Backoff{
Duration: rbdImageWatcherInitDelay, Duration: rbdImageWatcherInitDelay,

View File

@ -117,7 +117,7 @@ func checkSnapCloneExists(ctx context.Context, parentVol *rbdVolume, rbdSnap *rb
return false, err return false, err
} }
j, err := snapJournal.Connect(rbdSnap.Monitors, cr) j, err := snapJournal.Connect(rbdSnap.Monitors, rbdSnap.RadosNamespace, cr)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -242,7 +242,7 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er
kmsID = rv.KMS.GetID() kmsID = rv.KMS.GetID()
} }
j, err := volJournal.Connect(rv.Monitors, rv.conn.Creds) j, err := volJournal.Connect(rv.Monitors, rv.RadosNamespace, rv.conn.Creds)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -345,7 +345,7 @@ func reserveSnap(ctx context.Context, rbdSnap *rbdSnapshot, rbdVol *rbdVolume, c
return err return err
} }
j, err := snapJournal.Connect(rbdSnap.Monitors, cr) j, err := snapJournal.Connect(rbdSnap.Monitors, rbdSnap.RadosNamespace, cr)
if err != nil { if err != nil {
return err return err
} }
@ -423,7 +423,7 @@ func reserveVol(ctx context.Context, rbdVol *rbdVolume, rbdSnap *rbdSnapshot, cr
kmsID = rbdVol.KMS.GetID() kmsID = rbdVol.KMS.GetID()
} }
j, err := volJournal.Connect(rbdVol.Monitors, cr) j, err := volJournal.Connect(rbdVol.Monitors, rbdVol.RadosNamespace, cr)
if err != nil { if err != nil {
return err return err
} }
@ -450,7 +450,7 @@ func reserveVol(ctx context.Context, rbdVol *rbdVolume, rbdSnap *rbdSnapshot, cr
// undoSnapReservation is a helper routine to undo a name reservation for rbdSnapshot. // undoSnapReservation is a helper routine to undo a name reservation for rbdSnapshot.
func undoSnapReservation(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials) error { func undoSnapReservation(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials) error {
j, err := snapJournal.Connect(rbdSnap.Monitors, cr) j, err := snapJournal.Connect(rbdSnap.Monitors, rbdSnap.RadosNamespace, cr)
if err != nil { if err != nil {
return err return err
} }
@ -465,7 +465,7 @@ func undoSnapReservation(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Cre
// undoVolReservation is a helper routine to undo a name reservation for rbdVolume. // undoVolReservation is a helper routine to undo a name reservation for rbdVolume.
func undoVolReservation(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error { func undoVolReservation(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
j, err := volJournal.Connect(rbdVol.Monitors, cr) j, err := volJournal.Connect(rbdVol.Monitors, rbdVol.RadosNamespace, cr)
if err != nil { if err != nil {
return err return err
} }

View File

@ -86,6 +86,7 @@ type rbdVolume struct {
JournalPool string JournalPool string
Pool string `json:"pool"` Pool string `json:"pool"`
DataPool string DataPool string
RadosNamespace string
ImageID string ImageID string
ParentName string ParentName string
imageFeatureSet librbd.FeatureSet imageFeatureSet librbd.FeatureSet
@ -129,6 +130,7 @@ type rbdSnapshot struct {
Monitors string Monitors string
JournalPool string JournalPool string
Pool string Pool string
RadosNamespace string
CreatedAt *timestamp.Timestamp CreatedAt *timestamp.Timestamp
SizeBytes int64 SizeBytes int64
ClusterID string ClusterID string
@ -165,13 +167,19 @@ func (rv *rbdVolume) Destroy() {
} }
} }
// String returns the image-spec (pool/image) format of the image. // String returns the image-spec (pool/{namespace/}image) format of the image.
func (rv *rbdVolume) String() string { func (rv *rbdVolume) String() string {
if rv.RadosNamespace != "" {
return fmt.Sprintf("%s/%s/%s", rv.Pool, rv.RadosNamespace, rv.RbdImageName)
}
return fmt.Sprintf("%s/%s", rv.Pool, rv.RbdImageName) return fmt.Sprintf("%s/%s", rv.Pool, rv.RbdImageName)
} }
// String returns the snap-spec (pool/image@snap) format of the snapshot. // String returns the snap-spec (pool/{namespace/}image@snap) format of the snapshot.
func (rs *rbdSnapshot) String() string { func (rs *rbdSnapshot) String() string {
if rs.RadosNamespace != "" {
return fmt.Sprintf("%s/%s/%s@%s", rs.Pool, rs.RadosNamespace, rs.RbdImageName, rs.RbdSnapName)
}
return fmt.Sprintf("%s/%s@%s", rs.Pool, rs.RbdImageName, rs.RbdSnapName) return fmt.Sprintf("%s/%s@%s", rs.Pool, rs.RbdImageName, rs.RbdSnapName)
} }
@ -228,6 +236,7 @@ func (rv *rbdVolume) openIoctx() error {
return err return err
} }
ioctx.SetNamespace(rv.RadosNamespace)
rv.ioctx = ioctx rv.ioctx = ioctx
return nil return nil
@ -485,6 +494,7 @@ func (rv *rbdVolume) hasFeature(feature uint64) bool {
func (rv *rbdVolume) checkImageChainHasFeature(ctx context.Context, feature uint64) (bool, error) { func (rv *rbdVolume) checkImageChainHasFeature(ctx context.Context, feature uint64) (bool, error) {
vol := rbdVolume{ vol := rbdVolume{
Pool: rv.Pool, Pool: rv.Pool,
RadosNamespace: rv.RadosNamespace,
Monitors: rv.Monitors, Monitors: rv.Monitors,
RbdImageName: rv.RbdImageName, RbdImageName: rv.RbdImageName,
conn: rv.conn, conn: rv.conn,
@ -543,7 +553,12 @@ func genSnapFromSnapID(ctx context.Context, rbdSnap *rbdSnapshot, snapshotID str
} }
rbdSnap.JournalPool = rbdSnap.Pool rbdSnap.JournalPool = rbdSnap.Pool
j, err := snapJournal.Connect(rbdSnap.Monitors, cr) rbdSnap.RadosNamespace, err = util.RadosNamespace(csiConfigFile, rbdSnap.ClusterID)
if err != nil {
return err
}
j, err := snapJournal.Connect(rbdSnap.Monitors, rbdSnap.RadosNamespace, cr)
if err != nil { if err != nil {
return err return err
} }
@ -611,7 +626,12 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials,
} }
rbdVol.JournalPool = rbdVol.Pool rbdVol.JournalPool = rbdVol.Pool
j, err := volJournal.Connect(rbdVol.Monitors, cr) rbdVol.RadosNamespace, err = util.RadosNamespace(csiConfigFile, rbdVol.ClusterID)
if err != nil {
return nil, err
}
j, err := volJournal.Connect(rbdVol.Monitors, rbdVol.RadosNamespace, cr)
if err != nil { if err != nil {
return rbdVol, err return rbdVol, err
} }
@ -690,6 +710,10 @@ func genVolFromVolumeOptions(ctx context.Context, volOptions, credentials map[st
return nil, err return nil, err
} }
rbdVol.RadosNamespace, err = util.RadosNamespace(csiConfigFile, rbdVol.ClusterID)
if err != nil {
return nil, err
}
// if no image features is provided, it results in empty string // if no image features is provided, it results in empty string
// which disable all RBD image features as we expected // which disable all RBD image features as we expected
@ -742,6 +766,7 @@ func genSnapFromOptions(ctx context.Context, rbdVol *rbdVolume, snapOptions map[
rbdSnap := &rbdSnapshot{} rbdSnap := &rbdSnapshot{}
rbdSnap.Pool = rbdVol.Pool rbdSnap.Pool = rbdVol.Pool
rbdSnap.JournalPool = rbdVol.JournalPool rbdSnap.JournalPool = rbdVol.JournalPool
rbdSnap.RadosNamespace = rbdVol.RadosNamespace
rbdSnap.Monitors, rbdSnap.ClusterID, err = util.GetMonsAndClusterID(snapOptions) rbdSnap.Monitors, rbdSnap.ClusterID, err = util.GetMonsAndClusterID(snapOptions)
if err != nil { if err != nil {
@ -953,6 +978,7 @@ func (rv *rbdVolume) checkSnapExists(rbdSnap *rbdSnapshot) error {
type rbdImageMetadataStash struct { type rbdImageMetadataStash struct {
Version int `json:"Version"` Version int `json:"Version"`
Pool string `json:"pool"` Pool string `json:"pool"`
RadosNamespace string `json:"radosNamespace"`
ImageName string `json:"image"` ImageName string `json:"image"`
NbdAccess bool `json:"accessType"` NbdAccess bool `json:"accessType"`
Encrypted bool `json:"encrypted"` Encrypted bool `json:"encrypted"`
@ -961,8 +987,11 @@ type rbdImageMetadataStash struct {
// file name in which image metadata is stashed. // file name in which image metadata is stashed.
const stashFileName = "image-meta.json" const stashFileName = "image-meta.json"
// spec returns the image-spec (pool/image) format of the image. // spec returns the image-spec (pool/{namespace/}image) format of the image.
func (ri *rbdImageMetadataStash) String() string { func (ri *rbdImageMetadataStash) String() string {
if ri.RadosNamespace != "" {
return fmt.Sprintf("%s/%s/%s", ri.Pool, ri.RadosNamespace, ri.ImageName)
}
return fmt.Sprintf("%s/%s", ri.Pool, ri.ImageName) return fmt.Sprintf("%s/%s", ri.Pool, ri.ImageName)
} }
@ -973,6 +1002,7 @@ func stashRBDImageMetadata(volOptions *rbdVolume, path string) error {
// there are no checks for this at present // there are no checks for this at present
Version: 2, // nolint:gomnd // number specifies version. Version: 2, // nolint:gomnd // number specifies version.
Pool: volOptions.Pool, Pool: volOptions.Pool,
RadosNamespace: volOptions.RadosNamespace,
ImageName: volOptions.RbdImageName, ImageName: volOptions.RbdImageName,
Encrypted: volOptions.Encrypted, Encrypted: volOptions.Encrypted,
} }

View File

@ -88,6 +88,7 @@ func generateVolFromSnap(rbdSnap *rbdSnapshot) *rbdVolume {
vol.Monitors = rbdSnap.Monitors vol.Monitors = rbdSnap.Monitors
vol.Pool = rbdSnap.Pool vol.Pool = rbdSnap.Pool
vol.JournalPool = rbdSnap.JournalPool vol.JournalPool = rbdSnap.JournalPool
vol.RadosNamespace = rbdSnap.RadosNamespace
vol.RbdImageName = rbdSnap.RbdSnapName vol.RbdImageName = rbdSnap.RbdSnapName
vol.ImageID = rbdSnap.ImageID vol.ImageID = rbdSnap.ImageID
return vol return vol

View File

@ -37,6 +37,8 @@ const (
type ClusterInfo struct { type ClusterInfo struct {
// ClusterID is used for unique identification // ClusterID is used for unique identification
ClusterID string `json:"clusterID"` ClusterID string `json:"clusterID"`
// Namespace is the namespace in the pool
RadosNamespace string `json:"radosNamespace"`
// Monitors is monitor list for corresponding cluster ID // Monitors is monitor list for corresponding cluster ID
Monitors []string `json:"monitors"` Monitors []string `json:"monitors"`
// CephFS contains CephFS specific options // CephFS contains CephFS specific options
@ -50,6 +52,7 @@ type ClusterInfo struct {
// [ // [
// { // {
// "clusterID": "<cluster-id>", // "clusterID": "<cluster-id>",
// "namespace": "<namespace>",
// "monitors": // "monitors":
// [ // [
// "<monitor-value>", // "<monitor-value>",
@ -100,6 +103,15 @@ func Mons(pathToConfig, clusterID string) (string, error) {
return strings.Join(cluster.Monitors, ","), nil return strings.Join(cluster.Monitors, ","), nil
} }
// RadosNamespace returns the namespace for the given clusterID.
func RadosNamespace(pathToConfig, clusterID string) (string, error) {
cluster, err := readClusterInfo(pathToConfig, clusterID)
if err != nil {
return "", err
}
return cluster.RadosNamespace, nil
}
// CephFSSubvolumeGroup returns the subvolumeGroup for CephFS volumes. If not set, it returns the default value "csi". // CephFSSubvolumeGroup returns the subvolumeGroup for CephFS volumes. If not set, it returns the default value "csi".
func CephFSSubvolumeGroup(pathToConfig, clusterID string) (string, error) { func CephFSSubvolumeGroup(pathToConfig, clusterID string) (string, error) {
cluster, err := readClusterInfo(pathToConfig, clusterID) cluster, err := readClusterInfo(pathToConfig, clusterID)