Merge pull request #276 from red-hat-storage/sync_us--devel

Syncing latest changes from upstream devel for ceph-csi
This commit is contained in:
openshift-merge-bot[bot] 2024-03-21 08:08:40 +00:00 committed by GitHub
commit 47c48161d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1955 additions and 73 deletions

View File

@ -47,7 +47,8 @@ endif
GO_PROJECT=github.com/ceph/ceph-csi
CEPH_VERSION ?= $(shell . $(CURDIR)/build.env ; echo $${CEPH_VERSION})
GO_TAGS_LIST ?= $(CEPH_VERSION)
# TODO: ceph_preview tag required for FSQuiesce API
GO_TAGS_LIST ?= $(CEPH_VERSION) ceph_preview
# go build flags
LDFLAGS ?=

4
go.mod
View File

@ -6,10 +6,10 @@ toolchain go1.21.5
require (
github.com/IBM/keyprotect-go-client v0.12.2
github.com/aws/aws-sdk-go v1.50.26
github.com/aws/aws-sdk-go v1.50.32
github.com/aws/aws-sdk-go-v2/service/sts v1.28.1
github.com/ceph/ceph-csi/api v0.0.0-00010101000000-000000000000
github.com/ceph/go-ceph v0.26.0
github.com/ceph/go-ceph v0.26.1-0.20240319113421-755481f8c243
github.com/container-storage-interface/spec v1.9.0
github.com/csi-addons/spec v0.2.1-0.20230606140122-d20966d2e444
github.com/gemalto/kmip-go v0.0.10

8
go.sum
View File

@ -833,8 +833,8 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkY
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.44.164/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go v1.50.26 h1:tuv8+dje59DBK1Pj65tSCdD36oamBxKYJgbng4bFylc=
github.com/aws/aws-sdk-go v1.50.26/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go v1.50.32 h1:POt81DvegnpQKM4DMDLlHz1CO6OBnEoQ1gRhYFd7QRY=
github.com/aws/aws-sdk-go v1.50.32/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w=
github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8=
@ -870,8 +870,8 @@ github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw=
github.com/ceph/go-ceph v0.26.0 h1:LZoATo25ZH5aeL5t85BwIbrNLKCDfcDM+e0qV0cmwHY=
github.com/ceph/go-ceph v0.26.0/go.mod h1:ISxb295GszZwtLPkeWi+L2uLYBVsqbsh0M104jZMOX4=
github.com/ceph/go-ceph v0.26.1-0.20240319113421-755481f8c243 h1:O99PJ2rNxY+XiN2swRSmJC24V3YInVt5Lk48Em1cdVE=
github.com/ceph/go-ceph v0.26.1-0.20240319113421-755481f8c243/go.mod h1:PS15ql+uqcnZN8uD3WuxlImxdaTYtxqJoaTmlFJYnbI=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=

View File

@ -56,6 +56,10 @@ type ControllerServer struct {
// A map storing all volumes/snapshots with ongoing operations.
OperationLocks *util.OperationLock
// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by volumegroup ID/volumegroup name) return an Aborted error
VolumeGroupLocks *util.VolumeLocks
// Cluster name
ClusterName string

View File

@ -0,0 +1,250 @@
/*
Copyright 2024 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"context"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
"github.com/ceph/go-ceph/cephfs/admin"
)
type QuiesceState string
const (
Released QuiesceState = "RELEASED"
Quiescing QuiesceState = "QUIESCING"
Quiesced QuiesceState = "QUIESCED"
)
// GetQuiesceState returns the quiesce state of the filesystem.
func GetQuiesceState(set admin.QuiesceState) QuiesceState {
var state QuiesceState
switch set.Name {
case "RELEASED":
state = Released
case "QUIESCING":
state = Quiescing
case "QUIESCED":
state = Quiesced
default:
state = QuiesceState(set.Name)
}
return state
}
type FSQuiesceClient interface {
// Destroy destroys the connection used for FSAdmin.
Destroy()
// FSQuiesce quiesces the subvolumes in the filesystem.
FSQuiesce(
ctx context.Context,
reserveName string,
) (*admin.QuiesceInfo, error)
// GetVolumes returns the list of volumes in the filesystem that are to be
// quiesced.
GetVolumes() []Volume
// FSQuiesceWithExpireTimeout quiesces the subvolumes in the filesystem
// with an expiration timeout. it should be used after FSQuiesce to reset
// the expire timeout. This helps in keeping the subvolumes in the
// filesystem in quiesced state until all snapshots are taken.
FSQuiesceWithExpireTimeout(ctx context.Context,
reserveName string,
) (*admin.QuiesceInfo, error)
// ResetFSQuiesce resets the quiesce timeout for the subvolumes in
// the filesystem.
ResetFSQuiesce(ctx context.Context,
reserveName string,
) (*admin.QuiesceInfo, error)
// ReleaseFSQuiesce releases the quiesce on the subvolumes in the
// filesystem.
ReleaseFSQuiesce(ctx context.Context,
reserveName string,
) (*admin.QuiesceInfo, error)
}
type Volume struct {
VolumeID string
ClusterID string
}
type fsQuiesce struct {
connection *util.ClusterConnection
fsName string
volumes []Volume
// subVolumeGroupMapping is a map of subvolumes to groups.
subVolumeGroupMapping map[string][]string
fsa *admin.FSAdmin
}
// NewFSQuiesce returns a new instance of fsQuiesce. It
// take the filesystem name, the list of volumes to be quiesced, the mapping of
// subvolumes to groups and the cluster connection as input.
func NewFSQuiesce(
fsName string,
volumes []Volume,
mapping map[string][]string,
conn *util.ClusterConnection,
) (FSQuiesceClient, error) {
fsa, err := conn.GetFSAdmin()
if err != nil {
return nil, err
}
return &fsQuiesce{
connection: conn,
fsName: fsName,
volumes: volumes,
subVolumeGroupMapping: mapping,
fsa: fsa,
}, nil
}
// Destroy destroys the connection used for FSAdmin.
func (fq *fsQuiesce) Destroy() {
if fq.connection != nil {
fq.connection.Destroy()
}
}
// GetVolumes returns the list of volumes in the filesystem that are to be
// quiesced.
func (fq *fsQuiesce) GetVolumes() []Volume {
return fq.volumes
}
// getMembers returns the list of names in the format
// group/subvolume that are to be quiesced. This is the format that the
// ceph fs quiesce expects.
// Example: ["group1/subvolume1", "group1/subvolume2", "group2/subvolume1"].
func (fq *fsQuiesce) getMembers() []string {
volName := []string{}
for svg, sb := range fq.subVolumeGroupMapping {
for _, s := range sb {
name := svg + "/" + s
volName = append(volName, name)
}
}
return volName
}
func (fq *fsQuiesce) FSQuiesce(
ctx context.Context,
reserveName string,
) (*admin.QuiesceInfo, error) {
opt := &admin.FSQuiesceOptions{
Timeout: 180,
AwaitFor: 0,
Expiration: 180,
}
log.DebugLog(ctx,
"FSQuiesce for reserveName %s: members:%v options:%v",
reserveName,
fq.getMembers(),
opt)
resp, err := fq.fsa.FSQuiesce(fq.fsName, admin.NoGroup, fq.getMembers(), reserveName, opt)
if resp != nil {
qInfo := resp.Sets[reserveName]
return &qInfo, nil
}
log.ErrorLog(ctx, "failed to quiesce filesystem %s", err)
return nil, err
}
func (fq *fsQuiesce) FSQuiesceWithExpireTimeout(ctx context.Context,
reserveName string,
) (*admin.QuiesceInfo, error) {
opt := &admin.FSQuiesceOptions{
Timeout: 180,
AwaitFor: 0,
Expiration: 180,
}
log.DebugLog(ctx,
"FSQuiesceWithExpireTimeout for reserveName %s: members:%v options:%v",
reserveName,
fq.getMembers(),
opt)
resp, err := fq.fsa.FSQuiesce(fq.fsName, admin.NoGroup, fq.getMembers(), reserveName, opt)
if resp != nil {
qInfo := resp.Sets[reserveName]
return &qInfo, nil
}
log.ErrorLog(ctx, "failed to quiesce filesystem with expire timeout %s", err)
return nil, err
}
func (fq *fsQuiesce) ResetFSQuiesce(ctx context.Context,
reserveName string,
) (*admin.QuiesceInfo, error) {
opt := &admin.FSQuiesceOptions{
Reset: true,
AwaitFor: 0,
Timeout: 180,
Expiration: 180,
}
// Reset the filesystem quiesce so that the timer will be reset, and we can
// reuse the same reservation if it has already failed or timed out.
log.DebugLog(ctx,
"ResetFSQuiesce for reserveName %s: members:%v options:%v",
reserveName,
fq.getMembers(),
opt)
resp, err := fq.fsa.FSQuiesce(fq.fsName, admin.NoGroup, fq.getMembers(), reserveName, opt)
if resp != nil {
qInfo := resp.Sets[reserveName]
return &qInfo, nil
}
log.ErrorLog(ctx, "failed to reset timeout for quiesce filesystem %s", err)
return nil, err
}
func (fq *fsQuiesce) ReleaseFSQuiesce(ctx context.Context,
reserveName string,
) (*admin.QuiesceInfo, error) {
opt := &admin.FSQuiesceOptions{
AwaitFor: 0,
Release: true,
}
log.DebugLog(ctx,
"ReleaseFSQuiesce for reserveName %s: members:%v options:%v",
reserveName,
fq.getMembers(),
opt)
resp, err := fq.fsa.FSQuiesce(fq.fsName, admin.NoGroup, []string{}, reserveName, opt)
if resp != nil {
qInfo := resp.Sets[reserveName]
return &qInfo, nil
}
log.ErrorLog(ctx, "failed to release quiesce of filesystem %s", err)
return nil, err
}

View File

@ -67,6 +67,7 @@ func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer {
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
VolumeLocks: util.NewVolumeLocks(),
SnapshotLocks: util.NewVolumeLocks(),
VolumeGroupLocks: util.NewVolumeLocks(),
OperationLocks: util.NewOperationLock(),
}
}
@ -124,6 +125,10 @@ func (fs *Driver) Run(conf *util.Config) {
store.VolJournal = journal.NewCSIVolumeJournalWithNamespace(CSIInstanceID, fsutil.RadosNamespace)
store.SnapJournal = journal.NewCSISnapshotJournalWithNamespace(CSIInstanceID, fsutil.RadosNamespace)
store.VolumeGroupJournal = journal.NewCSIVolumeGroupJournalWithNamespace(
CSIInstanceID,
fsutil.RadosNamespace)
// Initialize default library driver
fs.cd = csicommon.NewCSIDriver(conf.DriverName, util.DriverVersion, conf.NodeID)
@ -146,6 +151,10 @@ func (fs *Driver) Run(conf *util.Config) {
csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER,
csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
})
fs.cd.AddGroupControllerServiceCapabilities([]csi.GroupControllerServiceCapability_RPC_Type{
csi.GroupControllerServiceCapability_RPC_CREATE_DELETE_GET_VOLUME_GROUP_SNAPSHOT,
})
}
// Create gRPC servers
@ -192,6 +201,7 @@ func (fs *Driver) Run(conf *util.Config) {
IS: fs.is,
CS: fs.cs,
NS: fs.ns,
GS: fs.cs,
}
server.Start(conf.Endpoint, srv)

View File

@ -58,6 +58,9 @@ var (
// ErrVolumeHasSnapshots is returned when a subvolume has snapshots.
ErrVolumeHasSnapshots = coreError.New("volume has snapshots")
// ErrQuiesceInProgress is returned when quiesce operation is in progress.
ErrQuiesceInProgress = coreError.New("quiesce operation is in progress")
)
// IsCloneRetryError returns true if the clone error is pending,in-progress

View File

@ -0,0 +1,779 @@
/*
Copyright 2024 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"context"
"errors"
"fmt"
"sort"
"time"
"github.com/ceph/ceph-csi/internal/cephfs/core"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
"github.com/ceph/ceph-csi/internal/cephfs/store"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
"github.com/ceph/go-ceph/cephfs/admin"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"k8s.io/utils/strings/slices"
)
// validateCreateVolumeGroupSnapshotRequest validates the request for creating
// a group snapshot of volumes.
func (cs *ControllerServer) validateCreateVolumeGroupSnapshotRequest(
ctx context.Context,
req *csi.CreateVolumeGroupSnapshotRequest,
) error {
if err := cs.Driver.ValidateGroupControllerServiceRequest(
csi.GroupControllerServiceCapability_RPC_CREATE_DELETE_GET_VOLUME_GROUP_SNAPSHOT); err != nil {
log.ErrorLog(ctx, "invalid create volume group snapshot req: %v", protosanitizer.StripSecrets(req))
return err
}
// Check sanity of request volume group snapshot Name, Source Volume Id's
if req.GetName() == "" {
return status.Error(codes.InvalidArgument, "volume group snapshot Name cannot be empty")
}
if len(req.GetSourceVolumeIds()) == 0 {
return status.Error(codes.InvalidArgument, "source volume ids cannot be empty")
}
param := req.GetParameters()
// check for ClusterID and fsName
if value, ok := param["clusterID"]; !ok || value == "" {
return status.Error(codes.InvalidArgument, "missing or empty clusterID")
}
if value, ok := param["fsName"]; !ok || value == "" {
return status.Error(codes.InvalidArgument, "missing or empty fsName")
}
return nil
}
// CreateVolumeGroupSnapshot creates a group snapshot of volumes.
func (cs *ControllerServer) CreateVolumeGroupSnapshot(
ctx context.Context,
req *csi.CreateVolumeGroupSnapshotRequest) (
*csi.CreateVolumeGroupSnapshotResponse,
error,
) {
if err := cs.validateCreateVolumeGroupSnapshotRequest(ctx, req); err != nil {
return nil, err
}
requestName := req.GetName()
// Existence and conflict checks
if acquired := cs.VolumeGroupLocks.TryAcquire(requestName); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, requestName)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, requestName)
}
defer cs.VolumeGroupLocks.Release(requestName)
cr, err := util.NewAdminCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
vg, err := store.NewVolumeGroupOptions(ctx, req, cr)
if err != nil {
log.ErrorLog(ctx, "failed to get volume group options: %v", err)
return nil, status.Error(codes.Internal, err.Error())
}
defer vg.Destroy()
vgs, err := store.CheckVolumeGroupSnapExists(ctx, vg, cr)
if err != nil {
log.ErrorLog(ctx, "failed to check volume group snapshot exists: %v", err)
return nil, status.Error(codes.Internal, err.Error())
}
// Get the fs names and subvolume from the volume ids to execute quiesce commands.
fsMap, err := getFsNamesAndSubVolumeFromVolumeIDs(ctx, req.GetSecrets(), req.GetSourceVolumeIds(), cr)
if err != nil {
log.ErrorLog(ctx, "failed to get fs names and subvolume from volume ids: %v", err)
return nil, status.Error(codes.Internal, err.Error())
}
defer destroyFSConnections(fsMap)
needRelease := checkIfFSNeedQuiesceRelease(vgs, req.GetSourceVolumeIds())
if needRelease {
return cs.releaseQuiesceAndGetVolumeGroupSnapshotResponse(ctx, req, vgs, fsMap, vg, cr)
}
// If the volume group snapshot does not exist, reserve the volume group
if vgs == nil {
vgs, err = store.ReserveVolumeGroup(ctx, vg, cr)
if err != nil {
log.ErrorLog(ctx, "failed to reserve volume group: %v", err)
return nil, status.Error(codes.Internal, err.Error())
}
}
inProgress, err := cs.queisceFileSystems(ctx, vgs, fsMap)
if err != nil {
log.ErrorLog(ctx, "failed to quiesce filesystems: %v", err)
if !errors.Is(err, cerrors.ErrQuiesceInProgress) {
uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets())
if uErr != nil {
log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr)
}
}
return nil, status.Error(codes.Internal, err.Error())
}
if inProgress {
return nil, status.Error(codes.Internal, "Quiesce operation is in progress")
}
resp, err := cs.createSnapshotAddToVolumeGroupJournal(ctx, req, vg, vgs, cr, fsMap)
if err != nil {
log.ErrorLog(ctx, "failed to create snapshot and add to volume group journal: %v", err)
if !errors.Is(err, cerrors.ErrQuiesceInProgress) {
// Handle Undo reservation and timeout as well
uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets())
if uErr != nil {
log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr)
}
}
return nil, status.Error(codes.Internal, err.Error())
}
response := &csi.CreateVolumeGroupSnapshotResponse{}
response.GroupSnapshot = &csi.VolumeGroupSnapshot{
GroupSnapshotId: vgs.VolumeGroupSnapshotID,
ReadyToUse: true,
CreationTime: timestamppb.New(time.Now()),
}
for _, r := range *resp {
r.Snapshot.GroupSnapshotId = vgs.VolumeGroupSnapshotID
response.GroupSnapshot.Snapshots = append(response.GroupSnapshot.Snapshots, r.Snapshot)
}
return response, nil
}
// queisceFileSystems quiesces the subvolumes and subvolume groups present in
// the filesystems of the volumeID's present in the
// CreateVolumeGroupSnapshotRequest.
func (cs *ControllerServer) queisceFileSystems(ctx context.Context,
vgs *store.VolumeGroupSnapshotIdentifier,
fsMap map[string]core.FSQuiesceClient,
) (bool, error) {
var inProgress bool
for _, fm := range fsMap {
// Quiesce the fs, subvolumes and subvolume groups
data, err := fm.FSQuiesce(ctx, vgs.RequestName)
if err != nil {
log.ErrorLog(ctx, "failed to quiesce filesystem: %v", err)
return inProgress, err
}
state := core.GetQuiesceState(data.State)
if state == core.Quiescing {
inProgress = true
} else if state != core.Quiesced {
return inProgress, fmt.Errorf("quiesce operation is in %s state", state)
}
}
return inProgress, nil
}
// releaseQuiesceAndGetVolumeGroupSnapshotResponse releases the quiesce of the
// subvolumes and subvolume groups in the filesystems for the volumeID's
// present in the CreateVolumeGroupSnapshotRequest.
func (cs *ControllerServer) releaseQuiesceAndGetVolumeGroupSnapshotResponse(
ctx context.Context,
req *csi.CreateVolumeGroupSnapshotRequest,
vgs *store.VolumeGroupSnapshotIdentifier,
fsMap map[string]core.FSQuiesceClient,
vg *store.VolumeGroupOptions,
cr *util.Credentials,
) (*csi.CreateVolumeGroupSnapshotResponse, error) {
matchesSourceVolumeIDs := matchesSourceVolumeIDs(vgs.GetVolumeIDs(), req.GetSourceVolumeIds())
if !matchesSourceVolumeIDs {
return nil, status.Errorf(
codes.InvalidArgument,
"source volume ids %v do not match in the existing volume group snapshot %v",
req.GetSourceVolumeIds(),
vgs.GetVolumeIDs())
}
// Release the quiesce of the subvolumes and subvolume groups in the
// filesystems for the volumes.
for _, fm := range fsMap {
// UnFreeze the filesystems, subvolumes and subvolume groups
data, err := fm.ReleaseFSQuiesce(ctx, vg.RequestName)
if err != nil {
log.ErrorLog(ctx, "failed to release filesystem quiesce: %v", err)
uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets())
if uErr != nil {
log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr)
}
return nil, status.Errorf(codes.Internal, "failed to release filesystem quiesce: %v", err)
}
state := core.GetQuiesceState(data.State)
if state != core.Released {
return nil, status.Errorf(codes.Internal, "quiesce operation is in %s state", state)
}
}
var err error
defer func() {
if err != nil && !errors.Is(err, cerrors.ErrQuiesceInProgress) {
uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets())
if uErr != nil {
log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr)
}
}
}()
snapshotResponses := make([]csi.CreateSnapshotResponse, 0)
for _, volID := range req.GetSourceVolumeIds() {
// Create the snapshot for the volumeID
clusterID := getClusterIDForVolumeID(fsMap, volID)
if clusterID == "" {
return nil, status.Errorf(codes.Internal, "failed to get clusterID for volumeID %s", volID)
}
req := formatCreateSnapshotRequest(volID, vgs.FsVolumeGroupSnapshotName,
clusterID,
req.GetSecrets())
var resp *csi.CreateSnapshotResponse
resp, err = cs.createSnapshotAndAddMapping(ctx, req, vg, vgs, cr)
if err != nil {
// Handle cleanup
log.ErrorLog(ctx, "failed to create snapshot: %v", err)
return nil, status.Errorf(codes.Internal,
"failed to create snapshot and add to volume group journal: %v",
err)
}
snapshotResponses = append(snapshotResponses, *resp)
}
response := &csi.CreateVolumeGroupSnapshotResponse{}
response.GroupSnapshot = &csi.VolumeGroupSnapshot{
GroupSnapshotId: vgs.VolumeGroupSnapshotID,
ReadyToUse: true,
CreationTime: timestamppb.New(time.Now()),
}
for _, r := range snapshotResponses {
r.Snapshot.GroupSnapshotId = vgs.VolumeGroupSnapshotID
response.GroupSnapshot.Snapshots = append(response.GroupSnapshot.Snapshots, r.Snapshot)
}
return response, nil
}
// createSnapshotAddToVolumeGroupJournal creates the snapshot and adds the
// snapshotID and volumeID to the volume group journal omap. If the freeze is
// true then it will freeze the subvolumes and subvolume groups before creating
// the snapshot and unfreeze them after creating the snapshot. If the freeze is
// false it will call createSnapshot and get the snapshot details for the
// volume and add the snapshotID and volumeID to the volume group journal omap.
// If any error occurs other than ErrInProgress it will delete the snapshots
// and undo the reservation and return the error.
func (cs *ControllerServer) createSnapshotAddToVolumeGroupJournal(
ctx context.Context,
req *csi.CreateVolumeGroupSnapshotRequest,
vgo *store.VolumeGroupOptions,
vgs *store.VolumeGroupSnapshotIdentifier,
cr *util.Credentials,
fsMap map[string]core.FSQuiesceClient) (
*[]csi.CreateSnapshotResponse,
error,
) {
var err error
var resp *csi.CreateSnapshotResponse
responses := make([]csi.CreateSnapshotResponse, 0)
for _, volID := range req.GetSourceVolumeIds() {
err = fsQuiesceWithExpireTimeout(ctx, vgo.RequestName, fsMap)
if err != nil {
log.ErrorLog(ctx, "failed to quiesce filesystem with timeout: %v", err)
return nil, err
}
// Create the snapshot for the volumeID
clusterID := getClusterIDForVolumeID(fsMap, volID)
if clusterID == "" {
return nil, fmt.Errorf("failed to get clusterID for volumeID %s", volID)
}
req := formatCreateSnapshotRequest(volID, vgs.FsVolumeGroupSnapshotName,
clusterID,
req.GetSecrets())
resp, err = cs.createSnapshotAndAddMapping(ctx, req, vgo, vgs, cr)
if err != nil {
// Handle cleanup
log.ErrorLog(ctx, "failed to create snapshot: %v", err)
return nil, err
}
responses = append(responses, *resp)
}
err = releaseFSQuiesce(ctx, vgo.RequestName, fsMap)
if err != nil {
log.ErrorLog(ctx, "failed to release filesystem quiesce: %v", err)
return nil, err
}
return &responses, nil
}
func formatCreateSnapshotRequest(volID, groupSnapshotName,
clusterID string,
secret map[string]string,
) *csi.CreateSnapshotRequest {
return &csi.CreateSnapshotRequest{
SourceVolumeId: volID,
Name: groupSnapshotName + "-" + volID,
Secrets: secret,
Parameters: map[string]string{
"clusterID": clusterID,
},
}
}
// releaseSubvolumeQuiesce releases the quiesce of the subvolumes and subvolume
// groups in the filesystems for the volumeID's present in the
// CreateVolumeGroupSnapshotRequest.
func releaseFSQuiesce(ctx context.Context,
requestName string,
fsMap map[string]core.FSQuiesceClient,
) error {
inProgress := false
var err error
var data *admin.QuiesceInfo
for _, fm := range fsMap {
// UnFreeze the filesystems, subvolumes and subvolume groups
data, err = fm.ReleaseFSQuiesce(ctx, requestName)
if err != nil {
log.ErrorLog(ctx, "failed to release filesystem quiesce: %v", err)
return err
}
state := core.GetQuiesceState(data.State)
if state != core.Released {
inProgress = true
}
}
if inProgress {
return cerrors.ErrQuiesceInProgress
}
return nil
}
// fsQuiesceWithExpireTimeout quiesces the subvolumes and subvolume
// groups in the filesystems for the volumeID's present in the
// CreateVolumeGroupSnapshotRequest.
func fsQuiesceWithExpireTimeout(ctx context.Context,
requestName string,
fsMap map[string]core.FSQuiesceClient,
) error {
var err error
var data *admin.QuiesceInfo
inProgress := false
for _, fm := range fsMap {
// reinitialize the expiry timer for the quiesce
data, err = fm.FSQuiesceWithExpireTimeout(ctx, requestName)
if err != nil {
log.ErrorLog(ctx, "failed to quiesce filesystem with timeout: %v", err)
return err
}
state := core.GetQuiesceState(data.State)
if state == core.Quiescing {
inProgress = true
} else if state != core.Quiesced {
return fmt.Errorf("quiesce operation is in %s state", state)
}
}
if inProgress {
return cerrors.ErrQuiesceInProgress
}
return nil
}
// createSnapshotAndAddMapping creates the snapshot and adds the snapshotID and
// volumeID to the volume group journal omap. If any error occurs it will
// delete the last created snapshot as its still not added to the journal.
func (cs *ControllerServer) createSnapshotAndAddMapping(
ctx context.Context,
req *csi.CreateSnapshotRequest,
vgo *store.VolumeGroupOptions,
vgs *store.VolumeGroupSnapshotIdentifier,
cr *util.Credentials,
) (*csi.CreateSnapshotResponse, error) {
// Create the snapshot
resp, err := cs.CreateSnapshot(ctx, req)
if err != nil {
// Handle cleanup
log.ErrorLog(ctx, "failed to create snapshot: %v", err)
return nil, err
}
j, err := store.VolumeGroupJournal.Connect(vgo.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, err
}
defer j.Destroy()
// Add the snapshot to the volume group journal
err = j.AddVolumeSnapshotMapping(ctx,
vgo.MetadataPool,
vgs.ReservedID,
req.GetSourceVolumeId(),
resp.GetSnapshot().GetSnapshotId())
if err != nil {
log.ErrorLog(ctx, "failed to add volume snapshot mapping: %v", err)
// Delete the last created snapshot as its still not added to the
// journal
delReq := &csi.DeleteSnapshotRequest{
SnapshotId: resp.GetSnapshot().GetSnapshotId(),
Secrets: req.GetSecrets(),
}
_, dErr := cs.DeleteSnapshot(ctx, delReq)
if dErr != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s: %v", resp.GetSnapshot().GetSnapshotId(), dErr)
}
return nil, err
}
return resp, nil
}
// checkIfFSNeedQuiesceRelease checks that do we have snapshots for all the
// volumes stored in the omap so that we can release the quiesce.
func checkIfFSNeedQuiesceRelease(vgs *store.VolumeGroupSnapshotIdentifier, volIDs []string) bool {
if vgs == nil {
return false
}
// If the number of volumes in the snapshot is not equal to the number of volumes
return len(vgs.GetVolumeIDs()) == len(volIDs)
}
// getClusterIDForVolumeID gets the clusterID for the volumeID from the fms map.
func getClusterIDForVolumeID(fms map[string]core.FSQuiesceClient, volumeID string) string {
for _, fm := range fms {
for _, vol := range fm.GetVolumes() {
if vol.VolumeID == volumeID {
return vol.ClusterID
}
}
}
return ""
}
// getFsNamesAndSubVolumeFromVolumeIDs gets the filesystem names and subvolumes
// from the volumeIDs present in the CreateVolumeGroupSnapshotRequest. It also
// returns the SubVolumeQuiesceClient for the filesystems present in the
// volumeIDs.
func getFsNamesAndSubVolumeFromVolumeIDs(ctx context.Context,
secret map[string]string,
volIDs []string,
cr *util.Credentials) (
map[string]core.FSQuiesceClient,
error,
) {
type fs struct {
fsName string
volumes []core.Volume
subVolumeGroupMapping map[string][]string
monitors string
}
fm := make(map[string]fs, 0)
for _, volID := range volIDs {
// Find the volume using the provided VolumeID
volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx,
volID, nil, secret, "", false)
if err != nil {
return nil, err
}
volOptions.Destroy()
// choosing monitorIP's and fsName as the unique key
// TODO: Need to use something else as the unique key as users can
// still choose the different monitorIP's and fsName for subvolumes
uniqueName := volOptions.Monitors + volOptions.FsName
if _, ok := fm[uniqueName]; !ok {
fm[uniqueName] = fs{
fsName: volOptions.FsName,
volumes: make([]core.Volume, 0),
subVolumeGroupMapping: make(map[string][]string), // Initialize the map
monitors: volOptions.Monitors,
}
}
a := core.Volume{
VolumeID: volID,
ClusterID: volOptions.ClusterID,
}
// Retrieve the value, modify it, and assign it back
val := fm[uniqueName]
val.volumes = append(val.volumes, a)
existingVolIDInMap := val.subVolumeGroupMapping[volOptions.SubVolume.SubvolumeGroup]
val.subVolumeGroupMapping[volOptions.SubVolume.SubvolumeGroup] = append(
existingVolIDInMap,
volOptions.SubVolume.VolID)
fm[uniqueName] = val
}
fsk := map[string]core.FSQuiesceClient{}
var err error
defer func() {
if err != nil {
destroyFSConnections(fsk)
}
}()
for k, v := range fm {
conn := &util.ClusterConnection{}
if err = conn.Connect(v.monitors, cr); err != nil {
return nil, err
}
fsk[k], err = core.NewFSQuiesce(v.fsName, v.volumes, v.subVolumeGroupMapping, conn)
if err != nil {
log.ErrorLog(ctx, "failed to get subvolume quiesce: %v", err)
conn.Destroy()
return nil, err
}
}
return fsk, nil
}
// destroyFSConnections destroys connections of all FSQuiesceClient.
func destroyFSConnections(fsMap map[string]core.FSQuiesceClient) {
for _, fm := range fsMap {
if fm != nil {
fm.Destroy()
}
}
}
// matchesSourceVolumeIDs checks if the sourceVolumeIDs and volumeIDsInOMap are
// equal.
func matchesSourceVolumeIDs(sourceVolumeIDs, volumeIDsInOMap []string) bool {
// sort the array as its required for slices.Equal call.
sort.Strings(sourceVolumeIDs)
sort.Strings(volumeIDsInOMap)
return slices.Equal(sourceVolumeIDs, volumeIDsInOMap)
}
// deleteSnapshotsAndUndoReservation deletes the snapshots and undoes the
// volume group reservation. It also resets the quiesce of the subvolumes and
// subvolume groups in the filesystems for the volumeID's present in the
// CreateVolumeGroupSnapshotRequest.
func (cs *ControllerServer) deleteSnapshotsAndUndoReservation(ctx context.Context,
vgs *store.VolumeGroupSnapshotIdentifier,
cr *util.Credentials,
fsMap map[string]core.FSQuiesceClient,
secrets map[string]string,
) error {
// get the omap from the snapshot and volume mapping
vgo, vgsi, err := store.NewVolumeGroupOptionsFromID(ctx, vgs.VolumeGroupSnapshotID, cr)
if err != nil {
log.ErrorLog(ctx, "failed to get volume group options from id: %v", err)
return err
}
defer vgo.Destroy()
for volID, snapID := range vgsi.VolumeSnapshotMap {
// delete the snapshots
req := &csi.DeleteSnapshotRequest{
SnapshotId: snapID,
Secrets: secrets,
}
_, err = cs.DeleteSnapshot(ctx, req)
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot: %v", err)
return err
}
j, err := store.VolumeGroupJournal.Connect(vgo.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return err
}
// remove the entry from the omap
err = j.RemoveVolumeSnapshotMapping(
ctx,
vgo.MetadataPool,
vgsi.ReservedID,
volID)
j.Destroy()
if err != nil {
log.ErrorLog(ctx, "failed to remove volume snapshot mapping: %v", err)
return err
}
// undo the reservation
err = store.UndoVolumeGroupReservation(ctx, vgo, vgsi, cr)
if err != nil {
log.ErrorLog(ctx, "failed to undo volume group reservation: %v", err)
return err
}
}
for _, fm := range fsMap {
_, err := fm.ResetFSQuiesce(ctx, vgs.RequestName)
if err != nil {
log.ErrorLog(ctx, "failed to reset filesystem quiesce: %v", err)
return err
}
}
return nil
}
// validateVolumeGroupSnapshotDeleteRequest validates the request for creating a group
// snapshot of volumes.
func (cs *ControllerServer) validateVolumeGroupSnapshotDeleteRequest(
ctx context.Context,
req *csi.DeleteVolumeGroupSnapshotRequest,
) error {
if err := cs.Driver.ValidateGroupControllerServiceRequest(
csi.GroupControllerServiceCapability_RPC_CREATE_DELETE_GET_VOLUME_GROUP_SNAPSHOT); err != nil {
log.ErrorLog(ctx, "invalid create volume group snapshot req: %v", protosanitizer.StripSecrets(req))
return err
}
// Check sanity of request volume group snapshot Name, Source Volume Id's
if req.GetGroupSnapshotId() == "" {
return status.Error(codes.InvalidArgument, "volume group snapshot id cannot be empty")
}
return nil
}
// DeleteVolumeGroupSnapshot deletes a group snapshot of volumes.
func (cs *ControllerServer) DeleteVolumeGroupSnapshot(ctx context.Context,
req *csi.DeleteVolumeGroupSnapshotRequest) (
*csi.DeleteVolumeGroupSnapshotResponse,
error,
) {
if err := cs.validateVolumeGroupSnapshotDeleteRequest(ctx, req); err != nil {
return nil, err
}
groupSnapshotID := req.GroupSnapshotId
// Existence and conflict checks
if acquired := cs.VolumeGroupLocks.TryAcquire(groupSnapshotID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, groupSnapshotID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, groupSnapshotID)
}
defer cs.VolumeGroupLocks.Release(groupSnapshotID)
cr, err := util.NewAdminCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
vgo, vgsi, err := store.NewVolumeGroupOptionsFromID(ctx, req.GroupSnapshotId, cr)
if err != nil {
log.ErrorLog(ctx, "failed to get volume group options: %v", err)
err = extractDeleteVolumeGroupError(err)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
}
vgo.Destroy()
volIds := vgsi.GetVolumeIDs()
fsMap, err := getFsNamesAndSubVolumeFromVolumeIDs(ctx, req.GetSecrets(), volIds, cr)
err = extractDeleteVolumeGroupError(err)
if err != nil {
log.ErrorLog(ctx, "failed to get volume group options: %v", err)
err = extractDeleteVolumeGroupError(err)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
}
defer destroyFSConnections(fsMap)
err = cs.deleteSnapshotsAndUndoReservation(ctx, vgsi, cr, fsMap, req.GetSecrets())
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", err)
err = extractDeleteVolumeGroupError(err)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
}
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
}
// extractDeleteVolumeGroupError extracts the error from the delete volume
// group snapshot and returns the error if it is not a ErrKeyNotFound or
// ErrPoolNotFound error.
func extractDeleteVolumeGroupError(err error) error {
switch {
case errors.Is(err, util.ErrPoolNotFound):
// if error is ErrPoolNotFound, the pool is already deleted we dont
// need to worry about deleting snapshot or omap data, return success
return nil
case errors.Is(err, util.ErrKeyNotFound):
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
// or partially complete (snap and snapOMap are garbage collected already), hence return
// success as deletion is complete
return nil
}
return err
}

View File

@ -0,0 +1,121 @@
/*
Copyright 2024 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"context"
"testing"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestControllerServer_validateCreateVolumeGroupSnapshotRequest(t *testing.T) {
t.Parallel()
cs := ControllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(
csicommon.NewCSIDriver("cephfs.csi.ceph.com", "1.0.0", "test")),
}
type args struct {
ctx context.Context
req *csi.CreateVolumeGroupSnapshotRequest
}
tests := []struct {
name string
args args
wantErr bool
code codes.Code
}{
{
"valid CreateVolumeGroupSnapshotRequest",
args{
context.Background(), &csi.CreateVolumeGroupSnapshotRequest{
Name: "vg-snap-1",
SourceVolumeIds: []string{"vg-1"},
Parameters: map[string]string{
"clusterID": "value",
"fsName": "value",
},
},
},
false,
codes.OK,
},
{
"empty request name in CreateVolumeGroupSnapshotRequest",
args{
context.Background(), &csi.CreateVolumeGroupSnapshotRequest{
SourceVolumeIds: []string{"vg-1"},
},
},
true,
codes.InvalidArgument,
},
{
"empty SourceVolumeIds in CreateVolumeGroupSnapshotRequest",
args{
context.Background(), &csi.CreateVolumeGroupSnapshotRequest{
Name: "vg-snap-1",
SourceVolumeIds: []string{"vg-1"},
},
},
true,
codes.InvalidArgument,
},
{
"empty clusterID in CreateVolumeGroupSnapshotRequest",
args{
context.Background(), &csi.CreateVolumeGroupSnapshotRequest{
Name: "vg-snap-1",
SourceVolumeIds: []string{"vg-1"},
Parameters: map[string]string{"fsName": "value"},
},
},
true,
codes.InvalidArgument,
},
{
"empty fsName in CreateVolumeGroupSnapshotRequest",
args{
context.Background(), &csi.CreateVolumeGroupSnapshotRequest{
Name: "vg-snap-1",
SourceVolumeIds: []string{"vg-1"},
Parameters: map[string]string{"clusterID": "value"},
},
},
true,
codes.InvalidArgument,
},
}
for _, tt := range tests {
ts := tt
t.Run(ts.name, func(t *testing.T) {
t.Parallel()
err := cs.validateCreateVolumeGroupSnapshotRequest(ts.args.ctx, ts.args.req)
if ts.wantErr {
c := status.Code(err)
if c != ts.code {
t.Errorf("ControllerServer.validateVolumeGroupSnapshotRequest() error = %v, want code %v", err, c)
}
}
})
}
}

View File

@ -58,6 +58,13 @@ func (is *IdentityServer) GetPluginCapabilities(
},
},
},
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_GROUP_CONTROLLER_SERVICE,
},
},
},
},
}, nil
}

View File

@ -40,6 +40,10 @@ var (
// SnapJournal is used to maintain RADOS based journals for CO generated.
// SnapshotName to backing CephFS subvolumes.
SnapJournal *journal.Config
// VolumeGroupJournal is used to maintain RADOS based journals for CO
// generate request name to CephFS snapshot group attributes.
VolumeGroupJournal journal.VolumeGroupJournalConfig
)
// VolumeIdentifier structure contains an association between the CSI VolumeID to its subvolume

View File

@ -0,0 +1,285 @@
/*
Copyright 2024 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"context"
"fmt"
"github.com/ceph/ceph-csi/internal/cephfs/core"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
"github.com/container-storage-interface/spec/lib/go/csi"
)
type VolumeGroupOptions struct {
*VolumeOptions
}
// NewVolumeGroupOptions generates a new instance of volumeGroupOptions from the provided
// CSI request parameters.
func NewVolumeGroupOptions(
ctx context.Context,
req *csi.CreateVolumeGroupSnapshotRequest,
cr *util.Credentials,
) (*VolumeGroupOptions, error) {
var (
opts = &VolumeGroupOptions{}
err error
)
volOptions := req.GetParameters()
opts.VolumeOptions, err = getVolumeOptions(volOptions)
if err != nil {
return nil, err
}
if err = extractOptionalOption(&opts.NamePrefix, "volumeGroupNamePrefix", volOptions); err != nil {
return nil, err
}
opts.RequestName = req.GetName()
err = opts.Connect(cr)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
opts.Destroy()
}
}()
fs := core.NewFileSystem(opts.conn)
opts.FscID, err = fs.GetFscID(ctx, opts.FsName)
if err != nil {
return nil, err
}
opts.MetadataPool, err = fs.GetMetadataPool(ctx, opts.FsName)
if err != nil {
return nil, err
}
return opts, nil
}
type VolumeGroupSnapshotIdentifier struct {
ReservedID string
FsVolumeGroupSnapshotName string
VolumeGroupSnapshotID string
RequestName string
VolumeSnapshotMap map[string]string
}
// GetVolumeIDs returns the list of volumeIDs in the VolumeSnaphotMap.
func (vgsi *VolumeGroupSnapshotIdentifier) GetVolumeIDs() []string {
keys := make([]string, 0, len(vgsi.VolumeSnapshotMap))
for k := range vgsi.VolumeSnapshotMap {
keys = append(keys, k)
}
return keys
}
// NewVolumeGroupOptionsFromID generates a new instance of volumeGroupOptions and GroupIdentifier
// from the provided CSI volumeGroupSnapshotID.
func NewVolumeGroupOptionsFromID(
ctx context.Context,
volumeGroupSnapshotID string,
cr *util.Credentials,
) (*VolumeGroupOptions, *VolumeGroupSnapshotIdentifier, error) {
var (
vi util.CSIIdentifier
volOptions = &VolumeGroupOptions{}
vgs VolumeGroupSnapshotIdentifier
)
// Decode the snapID first, to detect pre-provisioned snapshot before other errors
err := vi.DecomposeCSIID(volumeGroupSnapshotID)
if err != nil {
return nil, nil, cerrors.ErrInvalidVolID
}
volOptions.VolumeOptions = &VolumeOptions{}
volOptions.ClusterID = vi.ClusterID
vgs.VolumeGroupSnapshotID = volumeGroupSnapshotID
volOptions.FscID = vi.LocationID
vgs.ReservedID = vi.ObjectUUID
if volOptions.Monitors, err = util.Mons(util.CsiConfigFile, vi.ClusterID); err != nil {
return nil, nil, fmt.Errorf(
"failed to fetch monitor list using clusterID (%s): %w",
vi.ClusterID,
err)
}
err = volOptions.Connect(cr)
if err != nil {
return nil, nil, err
}
// in case of an error, volOptions is returned, but callers may not
// expect to need to call Destroy() on it. So, make sure to release any
// resources that may have been allocated
defer func() {
if err != nil {
volOptions.Destroy()
}
}()
fs := core.NewFileSystem(volOptions.conn)
volOptions.FsName, err = fs.GetFsName(ctx, volOptions.FscID)
if err != nil {
return nil, nil, err
}
volOptions.MetadataPool, err = fs.GetMetadataPool(ctx, volOptions.FsName)
if err != nil {
return nil, nil, err
}
j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, nil, err
}
defer j.Destroy()
groupAttributes, err := j.GetVolumeGroupAttributes(
ctx, volOptions.MetadataPool, vi.ObjectUUID)
if err != nil {
return nil, nil, err
}
vgs.RequestName = groupAttributes.RequestName
vgs.FsVolumeGroupSnapshotName = groupAttributes.GroupName
vgs.VolumeGroupSnapshotID = volumeGroupSnapshotID
vgs.VolumeSnapshotMap = groupAttributes.VolumeSnapshotMap
return volOptions, &vgs, nil
}
/*
CheckVolumeGroupSnapExists checks to determine if passed in RequestName in
volGroupOptions exists on the backend.
**NOTE:** These functions manipulate the rados omaps that hold information
regarding volume group snapshot names as requested by the CSI drivers. Hence,
these need to be invoked only when the respective CSI driver generated volume
group snapshot name based locks are held, as otherwise racy access to these
omaps may end up leaving them in an inconsistent state.
*/
func CheckVolumeGroupSnapExists(
ctx context.Context,
volOptions *VolumeGroupOptions,
cr *util.Credentials,
) (*VolumeGroupSnapshotIdentifier, error) {
// Connect to cephfs' default radosNamespace (csi)
j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, err
}
defer j.Destroy()
volGroupData, err := j.CheckReservation(
ctx, volOptions.MetadataPool, volOptions.RequestName, volOptions.NamePrefix)
if err != nil {
return nil, err
}
if volGroupData == nil {
return nil, nil
}
vgs := &VolumeGroupSnapshotIdentifier{}
vgs.RequestName = volOptions.RequestName
vgs.ReservedID = volGroupData.GroupUUID
vgs.FsVolumeGroupSnapshotName = volGroupData.GroupName
vgs.VolumeSnapshotMap = volGroupData.VolumeGroupAttributes.VolumeSnapshotMap
// found a snapshot already available, process and return it!
vgs.VolumeGroupSnapshotID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID,
"", volOptions.ClusterID, volGroupData.GroupUUID)
if err != nil {
return nil, err
}
log.DebugLog(ctx, "Found existing volume group snapshot (%s) with UUID (%s) for request (%s) and mapping %v",
vgs.RequestName, volGroupData.GroupUUID, vgs.RequestName, vgs.VolumeSnapshotMap)
return vgs, nil
}
// ReserveVolumeGroup is a helper routine to request a UUID reservation for the
// CSI request name and,
// to generate the volumegroup snapshot identifier for the reserved UUID.
func ReserveVolumeGroup(
ctx context.Context,
volOptions *VolumeGroupOptions,
cr *util.Credentials,
) (*VolumeGroupSnapshotIdentifier, error) {
var (
vgsi VolumeGroupSnapshotIdentifier
groupUUID string
err error
)
vgsi.RequestName = volOptions.RequestName
// Connect to cephfs' default radosNamespace (csi)
j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, err
}
defer j.Destroy()
groupUUID, vgsi.FsVolumeGroupSnapshotName, err = j.ReserveName(
ctx, volOptions.MetadataPool, util.InvalidPoolID, volOptions.RequestName, volOptions.NamePrefix)
if err != nil {
return nil, err
}
// generate the snapshot ID to return to the CO system
vgsi.VolumeGroupSnapshotID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID,
"", volOptions.ClusterID, groupUUID)
if err != nil {
return nil, err
}
log.DebugLog(ctx, "Generated volume group snapshot ID (%s) for request name (%s)",
vgsi.VolumeGroupSnapshotID, volOptions.RequestName)
return &vgsi, nil
}
// UndoVolumeGroupReservation is a helper routine to undo a name reservation
// for a CSI volumeGroupSnapshot name.
func UndoVolumeGroupReservation(
ctx context.Context,
volOptions *VolumeGroupOptions,
vgsi *VolumeGroupSnapshotIdentifier,
cr *util.Credentials,
) error {
// Connect to cephfs' default radosNamespace (csi)
j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return err
}
defer j.Destroy()
err = j.UndoReservation(ctx, volOptions.MetadataPool,
vgsi.FsVolumeGroupSnapshotName, vgsi.RequestName)
return err
}

View File

@ -209,10 +209,32 @@ func fmtBackingSnapshotOptionMismatch(optName, expected, actual string) error {
optName, actual, expected)
}
// getVolumeOptions validates the basic required basic options provided in the
// volume parameters and extract the volumeOptions from volume parameters.
// It contains the following checks:
// - clusterID must be set
// - monitors must be set
// - fsName must be set.
func getVolumeOptions(vo map[string]string) (*VolumeOptions, error) {
opts := VolumeOptions{}
clusterData, err := GetClusterInformation(vo)
if err != nil {
return nil, err
}
opts.ClusterID = clusterData.ClusterID
opts.Monitors = strings.Join(clusterData.Monitors, ",")
opts.SubvolumeGroup = clusterData.CephFS.SubvolumeGroup
if err = extractOption(&opts.FsName, "fsName", vo); err != nil {
return nil, err
}
return &opts, nil
}
// NewVolumeOptions generates a new instance of volumeOptions from the provided
// CSI request parameters.
//
//nolint:gocyclo,cyclop // TODO: reduce complexity
func NewVolumeOptions(
ctx context.Context,
requestName,
@ -222,20 +244,17 @@ func NewVolumeOptions(
cr *util.Credentials,
) (*VolumeOptions, error) {
var (
opts VolumeOptions
opts *VolumeOptions
backingSnapshotBool string
err error
)
volOptions := req.GetParameters()
clusterData, err := GetClusterInformation(volOptions)
opts, err = getVolumeOptions(volOptions)
if err != nil {
return nil, err
}
opts.ClusterID = clusterData.ClusterID
opts.Monitors = strings.Join(clusterData.Monitors, ",")
opts.SubvolumeGroup = clusterData.CephFS.SubvolumeGroup
opts.Owner = k8s.GetOwner(volOptions)
opts.BackingSnapshot = IsShallowVolumeSupported(req)
@ -247,10 +266,6 @@ func NewVolumeOptions(
return nil, err
}
if err = extractOption(&opts.FsName, "fsName", volOptions); err != nil {
return nil, err
}
if err = extractOptionalOption(&opts.KernelMountOptions, "kernelMountOptions", volOptions); err != nil {
return nil, err
}
@ -323,7 +338,7 @@ func NewVolumeOptions(
}
}
return &opts, nil
return opts, nil
}
// IsShallowVolumeSupported returns true only for ReadOnly volume requests

View File

@ -29,6 +29,7 @@ import (
// DefaultControllerServer points to default driver.
type DefaultControllerServer struct {
csi.UnimplementedControllerServer
csi.UnimplementedGroupControllerServer
Driver *CSIDriver
}

View File

@ -32,15 +32,7 @@ const (
)
type VolumeGroupJournal interface {
// Connect establishes a new connection to a ceph cluster for journal metadata.
Connect(
monitors,
namespace string,
cr *util.Credentials) error
// Destroy frees any resources and invalidates the journal connection.
Destroy()
// SetNamespace sets the namespace for the journal.
SetNamespace(ns string)
CheckReservation(
ctx context.Context,
journalPool,
@ -78,16 +70,20 @@ type VolumeGroupJournal interface {
volumeID string) error
}
// volumeGroupJournalConfig contains the configuration and connection details.
type volumeGroupJournalConfig struct {
*Config
*Connection
// VolumeGroupJournalConfig contains the configuration.
type VolumeGroupJournalConfig struct {
Config
}
type VolumeGroupJournalConnection struct {
config *VolumeGroupJournalConfig
connection *Connection
}
// NewCSIVolumeGroupJournal returns an instance of VolumeGroupJournal for groups.
func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournal {
return &volumeGroupJournalConfig{
Config: &Config{
func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournalConfig {
return VolumeGroupJournalConfig{
Config: Config{
csiDirectory: "csi.groups." + suffix,
csiNameKeyPrefix: "csi.volume.group.",
cephUUIDDirectoryPrefix: "csi.volume.group.",
@ -98,35 +94,42 @@ func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournal {
}
}
func (sgj *volumeGroupJournalConfig) SetNamespace(ns string) {
sgj.Config.namespace = ns
// SetNamespace sets the namespace for the journal.
func (vgc *VolumeGroupJournalConfig) SetNamespace(ns string) {
vgc.Config.namespace = ns
}
// NewCSIVolumeGroupJournalWithNamespace returns an instance of VolumeGroupJournal for
// volume groups using a predetermined namespace value.
func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournal {
func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournalConfig {
j := NewCSIVolumeGroupJournal(suffix)
j.SetNamespace(ns)
return j
}
func (sgj *volumeGroupJournalConfig) Connect(
// Connect establishes a new connection to a ceph cluster for journal metadata.
func (vgc *VolumeGroupJournalConfig) Connect(
monitors,
namespace string,
cr *util.Credentials,
) error {
conn, err := sgj.Config.Connect(monitors, namespace, cr)
if err != nil {
return err
) (VolumeGroupJournal, error) {
vgjc := &VolumeGroupJournalConnection{}
vgjc.config = &VolumeGroupJournalConfig{
Config: vgc.Config,
}
sgj.Connection = conn
conn, err := vgc.Config.Connect(monitors, namespace, cr)
if err != nil {
return nil, err
}
vgjc.connection = conn
return nil
return vgjc, nil
}
func (sgj *volumeGroupJournalConfig) Destroy() {
sgj.Connection.Destroy()
// Destroy frees any resources and invalidates the journal connection.
func (vgjc *VolumeGroupJournalConnection) Destroy() {
vgjc.connection.Destroy()
}
// VolumeGroupData contains the GroupUUID and VolumeGroupAttributes for a
@ -162,11 +165,11 @@ Return values:
reservation found.
- error: non-nil in case of any errors.
*/
func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context,
func (vgjc *VolumeGroupJournalConnection) CheckReservation(ctx context.Context,
journalPool, reqName, namePrefix string,
) (*VolumeGroupData, error) {
var (
cj = sgj.Config
cj = vgjc.config
volGroupData = &VolumeGroupData{}
)
@ -175,7 +178,7 @@ func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context,
cj.csiNameKeyPrefix + reqName,
}
values, err := getOMapValues(
ctx, sgj.Connection, journalPool, cj.namespace, cj.csiDirectory,
ctx, vgjc.connection, journalPool, cj.namespace, cj.csiDirectory,
cj.commonPrefix, fetchKeys)
if err != nil {
if errors.Is(err, util.ErrKeyNotFound) || errors.Is(err, util.ErrPoolNotFound) {
@ -195,13 +198,13 @@ func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context,
}
volGroupData.GroupUUID = objUUID
savedVolumeGroupAttributes, err := sgj.GetVolumeGroupAttributes(ctx, journalPool,
savedVolumeGroupAttributes, err := vgjc.GetVolumeGroupAttributes(ctx, journalPool,
objUUID)
if err != nil {
// error should specifically be not found, for image to be absent, any other error
// is not conclusive, and we should not proceed
if errors.Is(err, util.ErrKeyNotFound) {
err = sgj.UndoReservation(ctx, journalPool,
err = vgjc.UndoReservation(ctx, journalPool,
generateVolumeGroupName(namePrefix, objUUID), reqName)
}
@ -239,11 +242,11 @@ Input arguments:
- groupID: ID of the volume group, generated from the UUID
- reqName: Request name for the volume group
*/
func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context,
func (vgjc *VolumeGroupJournalConnection) UndoReservation(ctx context.Context,
csiJournalPool, groupID, reqName string,
) error {
// delete volume UUID omap (first, inverse of create order)
cj := sgj.Config
cj := vgjc.config
if groupID != "" {
if len(groupID) < uuidEncodedLength {
return fmt.Errorf("unable to parse UUID from %s, too short", groupID)
@ -256,8 +259,8 @@ func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context,
err := util.RemoveObject(
ctx,
sgj.Connection.monitors,
sgj.Connection.cr,
vgjc.connection.monitors,
vgjc.connection.cr,
csiJournalPool,
cj.namespace,
cj.cephUUIDDirectoryPrefix+groupUUID)
@ -271,7 +274,7 @@ func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context,
}
// delete the request name key (last, inverse of create order)
err := removeMapKeys(ctx, sgj.Connection, csiJournalPool, cj.namespace, cj.csiDirectory,
err := removeMapKeys(ctx, vgjc.connection, csiJournalPool, cj.namespace, cj.csiDirectory,
[]string{cj.csiNameKeyPrefix + reqName})
if err != nil {
log.ErrorLog(ctx, "failed removing oMap key %s (%s)", cj.csiNameKeyPrefix+reqName, err)
@ -299,11 +302,11 @@ Return values:
- string: Contains the VolumeGroup name that was reserved for the passed in reqName
- error: non-nil in case of any errors
*/
func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context,
func (vgjc *VolumeGroupJournalConnection) ReserveName(ctx context.Context,
journalPool string, journalPoolID int64,
reqName, namePrefix string,
) (string, string, error) {
cj := sgj.Config
cj := vgjc.config
// Create the UUID based omap first, to reserve the same and avoid conflicts
// NOTE: If any service loss occurs post creation of the UUID directory, and before
@ -311,8 +314,8 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context,
// UUID directory key will be leaked
objUUID, err := reserveOMapName(
ctx,
sgj.Connection.monitors,
sgj.Connection.cr,
vgjc.connection.monitors,
vgjc.connection.cr,
journalPool,
cj.namespace,
cj.cephUUIDDirectoryPrefix,
@ -325,7 +328,7 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context,
// After generating the UUID Directory omap, we populate the csiDirectory
// omap with a key-value entry to map the request to the backend volume group:
// `csiNameKeyPrefix + reqName: nameKeyVal`
err = setOMapKeys(ctx, sgj.Connection, journalPool, cj.namespace, cj.csiDirectory,
err = setOMapKeys(ctx, vgjc.connection, journalPool, cj.namespace, cj.csiDirectory,
map[string]string{cj.csiNameKeyPrefix + reqName: nameKeyVal})
if err != nil {
return "", "", err
@ -333,7 +336,7 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context,
defer func() {
if err != nil {
log.WarningLog(ctx, "reservation failed for volume group: %s", reqName)
errDefer := sgj.UndoReservation(ctx, journalPool, groupName, reqName)
errDefer := vgjc.UndoReservation(ctx, journalPool, groupName, reqName)
if errDefer != nil {
log.WarningLog(ctx, "failed undoing reservation of volume group: %s (%v)", reqName, errDefer)
}
@ -347,7 +350,7 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context,
omapValues[cj.csiNameKey] = reqName
omapValues[cj.csiImageKey] = groupName
err = setOMapKeys(ctx, sgj.Connection, journalPool, cj.namespace, oid, omapValues)
err = setOMapKeys(ctx, vgjc.connection, journalPool, cj.namespace, oid, omapValues)
if err != nil {
return "", "", err
}
@ -363,18 +366,18 @@ type VolumeGroupAttributes struct {
VolumeSnapshotMap map[string]string // Contains the volumeID and the corresponding snapshotID mapping
}
func (sgj *volumeGroupJournalConfig) GetVolumeGroupAttributes(
func (vgjc *VolumeGroupJournalConnection) GetVolumeGroupAttributes(
ctx context.Context,
pool, objectUUID string,
) (*VolumeGroupAttributes, error) {
var (
err error
groupAttributes = &VolumeGroupAttributes{}
cj = sgj.Config
cj = vgjc.config
)
values, err := listOMapValues(
ctx, sgj.Connection, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID,
ctx, vgjc.connection, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID,
cj.commonPrefix)
if err != nil {
if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) {
@ -398,14 +401,14 @@ func (sgj *volumeGroupJournalConfig) GetVolumeGroupAttributes(
return groupAttributes, nil
}
func (sgj *volumeGroupJournalConfig) AddVolumeSnapshotMapping(
func (vgjc *VolumeGroupJournalConnection) AddVolumeSnapshotMapping(
ctx context.Context,
pool,
reservedUUID,
volumeID,
snapshotID string,
) error {
err := setOMapKeys(ctx, sgj.Connection, pool, sgj.Config.namespace, sgj.Config.cephUUIDDirectoryPrefix+reservedUUID,
err := setOMapKeys(ctx, vgjc.connection, pool, vgjc.config.namespace, vgjc.config.cephUUIDDirectoryPrefix+reservedUUID,
map[string]string{volumeID: snapshotID})
if err != nil {
log.ErrorLog(ctx, "failed adding volume snapshot mapping: %v", err)
@ -416,13 +419,14 @@ func (sgj *volumeGroupJournalConfig) AddVolumeSnapshotMapping(
return nil
}
func (sgj *volumeGroupJournalConfig) RemoveVolumeSnapshotMapping(
func (vgjc *VolumeGroupJournalConnection) RemoveVolumeSnapshotMapping(
ctx context.Context,
pool,
reservedUUID,
volumeID string,
) error {
err := removeMapKeys(ctx, sgj.Connection, pool, sgj.Config.namespace, sgj.Config.cephUUIDDirectoryPrefix+reservedUUID,
err := removeMapKeys(ctx, vgjc.connection, pool, vgjc.config.namespace,
vgjc.config.cephUUIDDirectoryPrefix+reservedUUID,
[]string{volumeID})
if err != nil {
log.ErrorLog(ctx, "failed removing volume snapshot mapping: %v", err)

View File

@ -12547,6 +12547,9 @@ var awsPartition = partition{
endpointKey{
Region: "eu-south-1",
}: endpoint{},
endpointKey{
Region: "eu-south-2",
}: endpoint{},
endpointKey{
Region: "eu-west-1",
}: endpoint{},
@ -14554,6 +14557,9 @@ var awsPartition = partition{
endpointKey{
Region: "ca-central-1",
}: endpoint{},
endpointKey{
Region: "ca-west-1",
}: endpoint{},
endpointKey{
Region: "eu-central-1",
}: endpoint{},
@ -19213,66 +19219,222 @@ var awsPartition = partition{
endpointKey{
Region: "af-south-1",
}: endpoint{},
endpointKey{
Region: "af-south-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.af-south-1.api.aws",
},
endpointKey{
Region: "ap-east-1",
}: endpoint{},
endpointKey{
Region: "ap-east-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.ap-east-1.api.aws",
},
endpointKey{
Region: "ap-northeast-1",
}: endpoint{},
endpointKey{
Region: "ap-northeast-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.ap-northeast-1.api.aws",
},
endpointKey{
Region: "ap-northeast-2",
}: endpoint{},
endpointKey{
Region: "ap-northeast-2",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.ap-northeast-2.api.aws",
},
endpointKey{
Region: "ap-northeast-3",
}: endpoint{},
endpointKey{
Region: "ap-northeast-3",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.ap-northeast-3.api.aws",
},
endpointKey{
Region: "ap-south-1",
}: endpoint{},
endpointKey{
Region: "ap-south-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.ap-south-1.api.aws",
},
endpointKey{
Region: "ap-south-2",
}: endpoint{},
endpointKey{
Region: "ap-south-2",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.ap-south-2.api.aws",
},
endpointKey{
Region: "ap-southeast-1",
}: endpoint{},
endpointKey{
Region: "ap-southeast-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.ap-southeast-1.api.aws",
},
endpointKey{
Region: "ap-southeast-2",
}: endpoint{},
endpointKey{
Region: "ap-southeast-2",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.ap-southeast-2.api.aws",
},
endpointKey{
Region: "ap-southeast-3",
}: endpoint{},
endpointKey{
Region: "ap-southeast-3",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.ap-southeast-3.api.aws",
},
endpointKey{
Region: "ap-southeast-4",
}: endpoint{},
endpointKey{
Region: "ap-southeast-4",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.ap-southeast-4.api.aws",
},
endpointKey{
Region: "ca-central-1",
}: endpoint{},
endpointKey{
Region: "ca-central-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.ca-central-1.api.aws",
},
endpointKey{
Region: "ca-central-1",
Variant: fipsVariant,
}: endpoint{
Hostname: "logs-fips.ca-central-1.amazonaws.com",
},
endpointKey{
Region: "ca-west-1",
}: endpoint{},
endpointKey{
Region: "ca-west-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.ca-west-1.api.aws",
},
endpointKey{
Region: "ca-west-1",
Variant: fipsVariant,
}: endpoint{
Hostname: "logs-fips.ca-west-1.amazonaws.com",
},
endpointKey{
Region: "eu-central-1",
}: endpoint{},
endpointKey{
Region: "eu-central-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.eu-central-1.api.aws",
},
endpointKey{
Region: "eu-central-2",
}: endpoint{},
endpointKey{
Region: "eu-central-2",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.eu-central-2.api.aws",
},
endpointKey{
Region: "eu-north-1",
}: endpoint{},
endpointKey{
Region: "eu-north-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.eu-north-1.api.aws",
},
endpointKey{
Region: "eu-south-1",
}: endpoint{},
endpointKey{
Region: "eu-south-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.eu-south-1.api.aws",
},
endpointKey{
Region: "eu-south-2",
}: endpoint{},
endpointKey{
Region: "eu-south-2",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.eu-south-2.api.aws",
},
endpointKey{
Region: "eu-west-1",
}: endpoint{},
endpointKey{
Region: "eu-west-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.eu-west-1.api.aws",
},
endpointKey{
Region: "eu-west-2",
}: endpoint{},
endpointKey{
Region: "eu-west-2",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.eu-west-2.api.aws",
},
endpointKey{
Region: "eu-west-3",
}: endpoint{},
endpointKey{
Region: "eu-west-3",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.eu-west-3.api.aws",
},
endpointKey{
Region: "fips-ca-central-1",
}: endpoint{
Hostname: "logs-fips.ca-central-1.amazonaws.com",
CredentialScope: credentialScope{
Region: "ca-central-1",
},
Deprecated: boxedTrue,
},
endpointKey{
Region: "fips-ca-west-1",
}: endpoint{
Hostname: "logs-fips.ca-west-1.amazonaws.com",
CredentialScope: credentialScope{
Region: "ca-west-1",
},
Deprecated: boxedTrue,
},
endpointKey{
Region: "fips-us-east-1",
}: endpoint{
@ -19312,18 +19474,48 @@ var awsPartition = partition{
endpointKey{
Region: "il-central-1",
}: endpoint{},
endpointKey{
Region: "il-central-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.il-central-1.api.aws",
},
endpointKey{
Region: "me-central-1",
}: endpoint{},
endpointKey{
Region: "me-central-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.me-central-1.api.aws",
},
endpointKey{
Region: "me-south-1",
}: endpoint{},
endpointKey{
Region: "me-south-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.me-south-1.api.aws",
},
endpointKey{
Region: "sa-east-1",
}: endpoint{},
endpointKey{
Region: "sa-east-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.sa-east-1.api.aws",
},
endpointKey{
Region: "us-east-1",
}: endpoint{},
endpointKey{
Region: "us-east-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.us-east-1.api.aws",
},
endpointKey{
Region: "us-east-1",
Variant: fipsVariant,
@ -19333,6 +19525,12 @@ var awsPartition = partition{
endpointKey{
Region: "us-east-2",
}: endpoint{},
endpointKey{
Region: "us-east-2",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.us-east-2.api.aws",
},
endpointKey{
Region: "us-east-2",
Variant: fipsVariant,
@ -19342,6 +19540,12 @@ var awsPartition = partition{
endpointKey{
Region: "us-west-1",
}: endpoint{},
endpointKey{
Region: "us-west-1",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.us-west-1.api.aws",
},
endpointKey{
Region: "us-west-1",
Variant: fipsVariant,
@ -19351,6 +19555,12 @@ var awsPartition = partition{
endpointKey{
Region: "us-west-2",
}: endpoint{},
endpointKey{
Region: "us-west-2",
Variant: dualStackVariant,
}: endpoint{
Hostname: "logs.us-west-2.api.aws",
},
endpointKey{
Region: "us-west-2",
Variant: fipsVariant,

View File

@ -5,4 +5,4 @@ package aws
const SDKName = "aws-sdk-go"
// SDKVersion is the version of this SDK
const SDKVersion = "1.50.26"
const SDKVersion = "1.50.32"

View File

@ -0,0 +1,135 @@
//go:build ceph_preview
package admin
import "fmt"
// fixedPointFloat is a custom type that implements the MarshalJSON interface.
// This is used to format float64 values to two decimal places.
// By default these get converted to integers in the JSON output and
// fail the command.
type fixedPointFloat float64
// MarshalJSON provides a custom implementation for the JSON marshalling
// of fixedPointFloat. It formats the float to two decimal places.
func (fpf fixedPointFloat) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf("%.2f", float64(fpf))), nil
}
// fSQuiesceFields is the internal type used to create JSON for ceph.
// See FSQuiesceOptions for the type that users of the library
// interact with.
type fSQuiesceFields struct {
Prefix string `json:"prefix"`
VolName string `json:"vol_name"`
GroupName string `json:"group_name,omitempty"`
Members []string `json:"members,omitempty"`
SetId string `json:"set_id,omitempty"`
Timeout fixedPointFloat `json:"timeout,omitempty"`
Expiration fixedPointFloat `json:"expiration,omitempty"`
AwaitFor fixedPointFloat `json:"await_for,omitempty"`
Await bool `json:"await,omitempty"`
IfVersion int `json:"if_version,omitempty"`
Include bool `json:"include,omitempty"`
Exclude bool `json:"exclude,omitempty"`
Reset bool `json:"reset,omitempty"`
Release bool `json:"release,omitempty"`
Query bool `json:"query,omitempty"`
All bool `json:"all,omitempty"`
Cancel bool `json:"cancel,omitempty"`
}
// FSQuiesceOptions are used to specify optional, non-identifying, values
// to be used when quiescing a cephfs volume.
type FSQuiesceOptions struct {
Timeout float64
Expiration float64
AwaitFor float64
Await bool
IfVersion int
Include bool
Exclude bool
Reset bool
Release bool
Query bool
All bool
Cancel bool
}
// toFields is used to convert the FSQuiesceOptions to the internal
// fSQuiesceFields type.
func (o *FSQuiesceOptions) toFields(volume, group string, subvolumes []string, setId string) *fSQuiesceFields {
return &fSQuiesceFields{
Prefix: "fs quiesce",
VolName: volume,
GroupName: group,
Members: subvolumes,
SetId: setId,
Timeout: fixedPointFloat(o.Timeout),
Expiration: fixedPointFloat(o.Expiration),
AwaitFor: fixedPointFloat(o.AwaitFor),
Await: o.Await,
IfVersion: o.IfVersion,
Include: o.Include,
Exclude: o.Exclude,
Reset: o.Reset,
Release: o.Release,
Query: o.Query,
All: o.All,
Cancel: o.Cancel,
}
}
// QuiesceState is used to report the state of a quiesced fs volume.
type QuiesceState struct {
Name string `json:"name"`
Age float64 `json:"age"`
}
// QuiesceInfoMember is used to report the state of a quiesced fs volume.
// This is part of sets members object array in the json.
type QuiesceInfoMember struct {
Excluded bool `json:"excluded"`
State QuiesceState `json:"state"`
}
// QuiesceInfo reports various informational values about a quiesced volume.
// This is returned as sets object array in the json.
type QuiesceInfo struct {
Version int `json:"version"`
AgeRef float64 `json:"age_ref"`
State QuiesceState `json:"state"`
Timeout float64 `json:"timeout"`
Expiration float64 `json:"expiration"`
Members map[string]QuiesceInfoMember `json:"members"`
}
// FSQuiesceInfo reports various informational values about quiesced volumes.
type FSQuiesceInfo struct {
Epoch int `json:"epoch"`
SetVersion int `json:"set_version"`
Sets map[string]QuiesceInfo `json:"sets"`
}
// parseFSQuiesceInfo is used to parse the response from the quiesce command. It returns a FSQuiesceInfo object.
func parseFSQuiesceInfo(res response) (*FSQuiesceInfo, error) {
var info FSQuiesceInfo
if err := res.NoStatus().Unmarshal(&info).End(); err != nil {
return nil, err
}
return &info, nil
}
// FSQuiesce will quiesce the specified subvolumes in a volume.
// Quiescing a fs will prevent new writes to the subvolumes.
// Similar To:
//
// ceph fs quiesce <volume>
func (fsa *FSAdmin) FSQuiesce(volume, group string, subvolumes []string, setId string, o *FSQuiesceOptions) (*FSQuiesceInfo, error) {
if o == nil {
o = &FSQuiesceOptions{}
}
f := o.toFields(volume, group, subvolumes, setId)
return parseFSQuiesceInfo(fsa.marshalMgrCommand(f))
}

View File

@ -0,0 +1,53 @@
//go:build ceph_preview
package rbd
// #cgo LDFLAGS: -lrbd
// #include <rbd/librbd.h>
import "C"
// SnapGroupNamespace provides details about a single snapshot that was taken
// as part of an RBD group.
type SnapGroupNamespace struct {
Pool uint64
GroupName string
GroupSnapName string
}
// GetSnapGroupNamespace returns the SnapGroupNamespace of the snapshot which
// is part of a group. The caller should make sure that the snapshot ID passed
// in this function belongs to a snapshot that was taken as part of a group
// snapshot.
//
// Implements:
//
// int rbd_snap_get_group_namespace(rbd_image_t image, uint64_t snap_id,
// rbd_snap_group_namespace_t *group_snap,
// size_t group_snap_size)
func (image *Image) GetSnapGroupNamespace(snapID uint64) (*SnapGroupNamespace, error) {
if err := image.validate(imageIsOpen); err != nil {
return nil, err
}
var (
err error
sgn C.rbd_snap_group_namespace_t
)
ret := C.rbd_snap_get_group_namespace(image.image,
C.uint64_t(snapID),
&sgn,
C.sizeof_rbd_snap_group_namespace_t)
err = getError(ret)
if err != nil {
return nil, err
}
defer C.rbd_snap_group_namespace_cleanup(&sgn, C.sizeof_rbd_snap_group_namespace_t)
return &SnapGroupNamespace{
Pool: uint64(sgn.group_pool),
GroupName: C.GoString(sgn.group_name),
GroupSnapName: C.GoString(sgn.group_snap_name),
}, nil
}

4
vendor/modules.txt vendored
View File

@ -81,7 +81,7 @@ github.com/antlr/antlr4/runtime/Go/antlr/v4
# github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a
## explicit
github.com/asaskevich/govalidator
# github.com/aws/aws-sdk-go v1.50.26
# github.com/aws/aws-sdk-go v1.50.32
## explicit; go 1.19
github.com/aws/aws-sdk-go/aws
github.com/aws/aws-sdk-go/aws/auth/bearer
@ -204,7 +204,7 @@ github.com/ceph/ceph-csi/api/deploy/kubernetes/cephfs
github.com/ceph/ceph-csi/api/deploy/kubernetes/nfs
github.com/ceph/ceph-csi/api/deploy/kubernetes/rbd
github.com/ceph/ceph-csi/api/deploy/ocp
# github.com/ceph/go-ceph v0.26.0
# github.com/ceph/go-ceph v0.26.1-0.20240319113421-755481f8c243
## explicit; go 1.19
github.com/ceph/go-ceph/cephfs
github.com/ceph/go-ceph/cephfs/admin