diff --git a/internal/csi-common/controllerserver-default.go b/internal/csi-common/controllerserver-default.go index fcc261d78..2ce290928 100644 --- a/internal/csi-common/controllerserver-default.go +++ b/internal/csi-common/controllerserver-default.go @@ -47,3 +47,19 @@ func (cs *DefaultControllerServer) ControllerGetCapabilities( Capabilities: cs.Driver.capabilities, }, nil } + +// GroupControllerGetCapabilities implements the default +// GroupControllerGetCapabilities GRPC callout. +func (cs *DefaultControllerServer) GroupControllerGetCapabilities( + ctx context.Context, + req *csi.GroupControllerGetCapabilitiesRequest, +) (*csi.GroupControllerGetCapabilitiesResponse, error) { + log.TraceLog(ctx, "Using default GroupControllerGetCapabilities") + if cs.Driver == nil { + return nil, status.Error(codes.Unimplemented, "Group controller server is not enabled") + } + + return &csi.GroupControllerGetCapabilitiesResponse{ + Capabilities: cs.Driver.groupCapabilities, + }, nil +} diff --git a/internal/csi-common/driver.go b/internal/csi-common/driver.go index 31c89070e..4062845b4 100644 --- a/internal/csi-common/driver.go +++ b/internal/csi-common/driver.go @@ -31,9 +31,10 @@ type CSIDriver struct { nodeID string version string // topology constraints that this nodeserver will advertise - topology map[string]string - capabilities []*csi.ControllerServiceCapability - vc []*csi.VolumeCapability_AccessMode + topology map[string]string + capabilities []*csi.ControllerServiceCapability + groupCapabilities []*csi.GroupControllerServiceCapability + vc []*csi.VolumeCapability_AccessMode } // NewCSIDriver Creates a NewCSIDriver object. Assumes vendor @@ -116,3 +117,34 @@ func (d *CSIDriver) AddVolumeCapabilityAccessModes( func (d *CSIDriver) GetVolumeCapabilityAccessModes() []*csi.VolumeCapability_AccessMode { return d.vc } + +// AddControllerServiceCapabilities stores the group controller capabilities +// in driver object. +func (d *CSIDriver) AddGroupControllerServiceCapabilities(cl []csi.GroupControllerServiceCapability_RPC_Type) { + csc := make([]*csi.GroupControllerServiceCapability, 0, len(cl)) + + for _, c := range cl { + log.DefaultLog("Enabling group controller service capability: %v", c.String()) + csc = append(csc, NewGroupControllerServiceCapability(c)) + } + + d.groupCapabilities = csc +} + +// ValidateGroupControllerServiceRequest validates the group controller +// plugin capabilities. +// +//nolint:interfacer // c can be of type fmt.Stringer, but that does not make the API clearer +func (d *CSIDriver) ValidateGroupControllerServiceRequest(c csi.GroupControllerServiceCapability_RPC_Type) error { + if c == csi.GroupControllerServiceCapability_RPC_UNKNOWN { + return nil + } + + for _, capability := range d.groupCapabilities { + if c == capability.GetRpc().GetType() { + return nil + } + } + + return status.Error(codes.InvalidArgument, c.String()) +} diff --git a/internal/csi-common/server.go b/internal/csi-common/server.go index 13157f222..727ef57f8 100644 --- a/internal/csi-common/server.go +++ b/internal/csi-common/server.go @@ -45,6 +45,7 @@ type Servers struct { IS csi.IdentityServer CS csi.ControllerServer NS csi.NodeServer + GS csi.GroupControllerServer } // NewNonBlockingGRPCServer return non-blocking GRPC. @@ -109,6 +110,9 @@ func (s *nonBlockingGRPCServer) serve(endpoint string, srv Servers) { if srv.NS != nil { csi.RegisterNodeServer(server, srv.NS) } + if srv.GS != nil { + csi.RegisterGroupControllerServer(server, srv.GS) + } log.DefaultLog("Listening for connections on address: %#v", listener.Addr()) err = server.Serve(listener) diff --git a/internal/csi-common/utils.go b/internal/csi-common/utils.go index 080f9df93..daf6170ee 100644 --- a/internal/csi-common/utils.go +++ b/internal/csi-common/utils.go @@ -95,6 +95,18 @@ func NewControllerServiceCapability(ctrlCap csi.ControllerServiceCapability_RPC_ } } +// NewGroupControllerServiceCapability returns group controller capabilities. +func NewGroupControllerServiceCapability(ctrlCap csi.GroupControllerServiceCapability_RPC_Type, +) *csi.GroupControllerServiceCapability { + return &csi.GroupControllerServiceCapability{ + Type: &csi.GroupControllerServiceCapability_Rpc{ + Rpc: &csi.GroupControllerServiceCapability_RPC{ + Type: ctrlCap, + }, + }, + } +} + // NewMiddlewareServerOption creates a new grpc.ServerOption that configures a // common format for log messages and other gRPC related handlers. func NewMiddlewareServerOption() grpc.ServerOption { @@ -133,6 +145,13 @@ func getReqID(req interface{}) string { case *csi.NodeExpandVolumeRequest: reqID = r.VolumeId + + case *csi.CreateVolumeGroupSnapshotRequest: + reqID = r.Name + case *csi.DeleteVolumeGroupSnapshotRequest: + reqID = r.GroupSnapshotId + case *csi.GetVolumeGroupSnapshotRequest: + reqID = r.GroupSnapshotId } return reqID diff --git a/internal/csi-common/utils_test.go b/internal/csi-common/utils_test.go index e5687986a..ddb16648a 100644 --- a/internal/csi-common/utils_test.go +++ b/internal/csi-common/utils_test.go @@ -65,6 +65,16 @@ func TestGetReqID(t *testing.T) { &csi.NodeExpandVolumeRequest{ VolumeId: fakeID, }, + + &csi.CreateVolumeGroupSnapshotRequest{ + Name: fakeID, + }, + &csi.DeleteVolumeGroupSnapshotRequest{ + GroupSnapshotId: fakeID, + }, + &csi.GetVolumeGroupSnapshotRequest{ + GroupSnapshotId: fakeID, + }, } for _, r := range req { if got := getReqID(r); got != fakeID { diff --git a/internal/journal/omap.go b/internal/journal/omap.go index 9451cdee2..c62ca51de 100644 --- a/internal/journal/omap.go +++ b/internal/journal/omap.go @@ -19,6 +19,7 @@ package journal import ( "context" "errors" + "fmt" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -79,7 +80,7 @@ func getOMapValues( log.ErrorLog(ctx, "omap not found (pool=%q, namespace=%q, name=%q): %v", poolName, namespace, oid, err) - return nil, util.JoinErrors(util.ErrKeyNotFound, err) + return nil, fmt.Errorf("%w: %w", util.ErrKeyNotFound, err) } return nil, err @@ -168,3 +169,57 @@ func omapPoolError(err error) error { return err } + +// listOMapValues fetches all omap values for a given oid, prefix, and namespace. +func listOMapValues( + ctx context.Context, + conn *Connection, + poolName, namespace, oid, prefix string, +) (map[string]string, error) { + // fetch and configure the rados ioctx + ioctx, err := conn.conn.GetIoctx(poolName) + if err != nil { + return nil, omapPoolError(err) + } + defer ioctx.Destroy() + + if namespace != "" { + ioctx.SetNamespace(namespace) + } + + results := map[string]string{} + + numKeys := uint64(0) + startAfter := "" + for { + prevNumKeys := numKeys + err = ioctx.ListOmapValues( + oid, startAfter, prefix, chunkSize, + func(key string, value []byte) { + numKeys++ + startAfter = key + results[key] = string(value) + }, + ) + // if we hit an error, or no new keys were seen, exit the loop + if err != nil || numKeys == prevNumKeys { + break + } + } + + if err != nil { + if errors.Is(err, rados.ErrNotFound) { + log.ErrorLog(ctx, "omap not found (pool=%q, namespace=%q, name=%q): %v", + poolName, namespace, oid, err) + + return nil, fmt.Errorf("%w: %w", util.ErrKeyNotFound, err) + } + + return nil, err + } + + log.DebugLog(ctx, "got omap values: (pool=%q, namespace=%q, name=%q): %+v", + poolName, namespace, oid, results) + + return results, nil +} diff --git a/internal/journal/volumegroupjournal.go b/internal/journal/volumegroupjournal.go new file mode 100644 index 000000000..b12f31f7b --- /dev/null +++ b/internal/journal/volumegroupjournal.go @@ -0,0 +1,434 @@ +/* +Copyright 2024 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package journal + +import ( + "context" + "errors" + "fmt" + + "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" + + "github.com/google/uuid" +) + +const ( + defaultVolumeGroupNamingPrefix string = "csi-vol-group-" +) + +type VolumeGroupJournal interface { + // Connect establishes a new connection to a ceph cluster for journal metadata. + Connect( + monitors, + namespace string, + cr *util.Credentials) (*volumeGroupJournalConfig, error) + // Destroy frees any resources and invalidates the journal connection. + Destroy() + // SetNamespace sets the namespace for the journal. + SetNamespace(ns string) + CheckReservation( + ctx context.Context, + journalPool, + reqName, + namePrefix string) (*VolumeGroupData, error) + UndoReservation( + ctx context.Context, + csiJournalPool, + snapshotGroupName, + reqName string) error + // GetGroupAttributes fetches all keys and their values, from a UUID directory, + // returning VolumeGroupAttributes structure. + GetVolumeGroupAttributes( + ctx context.Context, + pool, + objectUUID string) (*VolumeGroupAttributes, error) + ReserveName( + ctx context.Context, + journalPool string, + journalPoolID int64, + reqName, + namePrefix string) (string, string, error) + // AddVolumeSnapshotMapping adds a volumeID and snapshotID mapping to the UUID directory. + AddVolumeSnapshotMapping( + ctx context.Context, + pool, + reservedUUID, + volumeID, + snapshotID string) error + // RemoveVolumeSnapshotMapping removes a volumeID and snapshotID mapping from the UUID directory. + RemoveVolumeSnapshotMapping( + ctx context.Context, + pool, + reservedUUID, + volumeID string) error +} + +// volumeGroupJournalConfig contains the configuration and connection details. +type volumeGroupJournalConfig struct { + *Config + *Connection +} + +// NewCSIVolumeroupJournal returns an instance of VolumeGroupJournal for groups. +func NewCSIVolumeroupJournal(suffix string) VolumeGroupJournal { + return &volumeGroupJournalConfig{ + Config: &Config{ + csiDirectory: "csi.groups." + suffix, + csiNameKeyPrefix: "csi.volume.group.", + cephUUIDDirectoryPrefix: "csi.volume.group.", + csiImageKey: "csi.groupname", + csiNameKey: "csi.volname", + namespace: "", + }, + } +} + +func (sgj *volumeGroupJournalConfig) SetNamespace(ns string) { + sgj.Config.namespace = ns +} + +// NewCSIVolumeGroupJournalWithNamespace returns an instance of VolumeGroupJournal for +// volume groups using a predetermined namespace value. +func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournal { + j := NewCSIVolumeroupJournal(suffix) + j.SetNamespace(ns) + + return j +} + +func (sgj *volumeGroupJournalConfig) Connect( + monitors, + namespace string, + cr *util.Credentials, +) (*volumeGroupJournalConfig, error) { + conn, err := sgj.Config.Connect(monitors, namespace, cr) + if err != nil { + return nil, err + } + sgj.Connection = conn + + return sgj, nil +} + +func (sgj *volumeGroupJournalConfig) Destroy() { + sgj.Connection.Destroy() +} + +// VolumeGroupData contains the GroupUUID and VolumeGroupAttributes for a +// volume group. +type VolumeGroupData struct { + GroupUUID string + GroupName string + VolumeGroupAttributes *VolumeGroupAttributes +} + +func generateVolumeGroupName(namePrefix, groupUUID string) string { + if namePrefix == "" { + namePrefix = defaultVolumeGroupNamingPrefix + } + + return namePrefix + groupUUID +} + +/* +CheckReservation checks if given request name contains a valid reservation + - If there is a valid reservation, then the corresponding VolumeGroupData for + the snapshot group is returned + - If there is a reservation that is stale (or not fully cleaned up), it is + garbage collected using the UndoReservation call, as appropriate + +NOTE: As the function manipulates omaps, it should be called with a lock +against the request name held, to prevent parallel operations from modifying +the state of the omaps for this request name. + +Return values: + - VolumeGroupData: which contains the GroupUUID and GroupSnapshotAttributes + that were reserved for the passed in reqName, empty if there was no + reservation found. + - error: non-nil in case of any errors. +*/ +func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context, + journalPool, reqName, namePrefix string, +) (*VolumeGroupData, error) { + var ( + cj = sgj.Config + volGroupData = &VolumeGroupData{} + ) + + // check if request name is already part of the directory omap + fetchKeys := []string{ + cj.csiNameKeyPrefix + reqName, + } + values, err := getOMapValues( + ctx, sgj.Connection, journalPool, cj.namespace, cj.csiDirectory, + cj.commonPrefix, fetchKeys) + if err != nil { + if errors.Is(err, util.ErrKeyNotFound) || errors.Is(err, util.ErrPoolNotFound) { + // pool or omap (oid) was not present + // stop processing but without an error for no reservation exists + return nil, nil + } + + return nil, err + } + + objUUID, found := values[cj.csiNameKeyPrefix+reqName] + if !found { + // omap was read but was missing the desired key-value pair + // stop processing but without an error for no reservation exists + return nil, nil + } + volGroupData.GroupUUID = objUUID + + savedVolumeGroupAttributes, err := sgj.GetVolumeGroupAttributes(ctx, journalPool, + objUUID) + if err != nil { + // error should specifically be not found, for image to be absent, any other error + // is not conclusive, and we should not proceed + if errors.Is(err, util.ErrKeyNotFound) { + err = sgj.UndoReservation(ctx, journalPool, + generateVolumeGroupName(namePrefix, objUUID), reqName) + } + + return nil, err + } + + // check if the request name in the omap matches the passed in request name + if savedVolumeGroupAttributes.RequestName != reqName { + // NOTE: This should never be possible, hence no cleanup, but log error + // and return, as cleanup may need to occur manually! + return nil, fmt.Errorf("internal state inconsistent, omap names mismatch,"+ + " request name (%s) volume group UUID (%s) volume group omap name (%s)", + reqName, objUUID, savedVolumeGroupAttributes.RequestName) + } + volGroupData.GroupName = savedVolumeGroupAttributes.GroupName + volGroupData.VolumeGroupAttributes = &VolumeGroupAttributes{} + volGroupData.VolumeGroupAttributes.RequestName = savedVolumeGroupAttributes.RequestName + volGroupData.VolumeGroupAttributes.VolumeSnapshotMap = savedVolumeGroupAttributes.VolumeSnapshotMap + + return volGroupData, nil +} + +/* +UndoReservation undoes a reservation, in the reverse order of ReserveName +- The UUID directory is cleaned up before the GroupName key in the csiDirectory is cleaned up + +NOTE: Ensure that the Ceph volume snapshots backing the reservation is cleaned up +prior to cleaning up the reservation + +NOTE: As the function manipulates omaps, it should be called with a lock against the request name +held, to prevent parallel operations from modifying the state of the omaps for this request name. + +Input arguments: + - csiJournalPool: Pool name that holds the CSI request name based journal + - groupID: ID of the volume group, generated from the UUID + - reqName: Request name for the volume group +*/ +func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context, + csiJournalPool, groupID, reqName string, +) error { + // delete volume UUID omap (first, inverse of create order) + cj := sgj.Config + if groupID != "" { + if len(groupID) < uuidEncodedLength { + return fmt.Errorf("unable to parse UUID from %s, too short", groupID) + } + + groupUUID := groupID[len(groupID)-36:] + if _, err := uuid.Parse(groupUUID); err != nil { + return fmt.Errorf("failed parsing UUID in %s: %w", groupUUID, err) + } + + err := util.RemoveObject( + ctx, + sgj.Connection.monitors, + sgj.Connection.cr, + csiJournalPool, + cj.namespace, + cj.cephUUIDDirectoryPrefix+groupUUID) + if err != nil { + if !errors.Is(err, util.ErrObjectNotFound) { + log.ErrorLog(ctx, "failed removing oMap %s (%s)", cj.cephUUIDDirectoryPrefix+groupUUID, err) + + return err + } + } + } + + // delete the request name key (last, inverse of create order) + err := removeMapKeys(ctx, sgj.Connection, csiJournalPool, cj.namespace, cj.csiDirectory, + []string{cj.csiNameKeyPrefix + reqName}) + if err != nil { + log.ErrorLog(ctx, "failed removing oMap key %s (%s)", cj.csiNameKeyPrefix+reqName, err) + } + + return err +} + +/* +ReserveName adds respective entries to the csiDirectory omaps, post generating a target +UUIDDirectory for use. Further, these functions update the UUIDDirectory omaps, to store back +pointers to the CSI generated request names. + +NOTE: As the function manipulates omaps, it should be called with a lock against the request name +held, to prevent parallel operations from modifying the state of the omaps for this request name. + +Input arguments: + - journalPool: Pool where the CSI journal is stored + - journalPoolID: pool ID of the journalPool + - reqName: Name of the volumeGroupSnapshot request received + - namePrefix: Prefix to use when generating the volumeGroupName name (suffix is an auto-generated UUID) + +Return values: + - string: Contains the UUID that was reserved for the passed in reqName + - string: Contains the VolumeGroup name that was reserved for the passed in reqName + - error: non-nil in case of any errors +*/ +func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, + journalPool string, journalPoolID int64, + reqName, namePrefix string, +) (string, string, error) { + cj := sgj.Config + + // Create the UUID based omap first, to reserve the same and avoid conflicts + // NOTE: If any service loss occurs post creation of the UUID directory, and before + // setting the request name key to point back to the UUID directory, the + // UUID directory key will be leaked + objUUID, err := reserveOMapName( + ctx, + sgj.Connection.monitors, + sgj.Connection.cr, + journalPool, + cj.namespace, + cj.cephUUIDDirectoryPrefix, + "") + if err != nil { + return "", "", err + } + groupName := generateVolumeGroupName(namePrefix, objUUID) + nameKeyVal := objUUID + // After generating the UUID Directory omap, we populate the csiDirectory + // omap with a key-value entry to map the request to the backend volume group: + // `csiNameKeyPrefix + reqName: nameKeyVal` + err = setOMapKeys(ctx, sgj.Connection, journalPool, cj.namespace, cj.csiDirectory, + map[string]string{cj.csiNameKeyPrefix + reqName: nameKeyVal}) + if err != nil { + return "", "", err + } + defer func() { + if err != nil { + log.WarningLog(ctx, "reservation failed for volume group: %s", reqName) + errDefer := sgj.UndoReservation(ctx, journalPool, groupName, reqName) + if errDefer != nil { + log.WarningLog(ctx, "failed undoing reservation of volume group: %s (%v)", reqName, errDefer) + } + } + }() + + oid := cj.cephUUIDDirectoryPrefix + objUUID + omapValues := map[string]string{} + + // Update UUID directory to store CSI request name + omapValues[cj.csiNameKey] = reqName + omapValues[cj.csiImageKey] = groupName + + err = setOMapKeys(ctx, sgj.Connection, journalPool, cj.namespace, oid, omapValues) + if err != nil { + return "", "", err + } + + return objUUID, groupName, nil +} + +// VolumeGroupAttributes contains the request name and the volumeID's and +// the corresponding snapshotID's. +type VolumeGroupAttributes struct { + RequestName string // Contains the request name for the passed in UUID + GroupName string // Contains the group name + VolumeSnapshotMap map[string]string // Contains the volumeID and the corresponding snapshotID mapping +} + +func (sgj *volumeGroupJournalConfig) GetVolumeGroupAttributes( + ctx context.Context, + pool, objectUUID string, +) (*VolumeGroupAttributes, error) { + var ( + err error + groupAttributes = &VolumeGroupAttributes{} + cj = sgj.Config + ) + + values, err := listOMapValues( + ctx, sgj.Connection, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, + cj.commonPrefix) + if err != nil { + if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) { + return nil, err + } + log.WarningLog(ctx, "unable to read omap values: pool missing: %v", err) + } + + groupAttributes.RequestName = values[cj.csiNameKey] + groupAttributes.GroupName = values[cj.csiImageKey] + + // Remove request name key and group name key from the omap, as we are + // looking for volumeID/snapshotID mapping + delete(values, cj.csiNameKey) + delete(values, cj.csiImageKey) + groupAttributes.VolumeSnapshotMap = map[string]string{} + for k, v := range values { + groupAttributes.VolumeSnapshotMap[k] = v + } + + return groupAttributes, nil +} + +func (sgj *volumeGroupJournalConfig) AddVolumeSnapshotMapping( + ctx context.Context, + pool, + reservedUUID, + volumeID, + snapshotID string, +) error { + err := setOMapKeys(ctx, sgj.Connection, pool, sgj.Config.namespace, sgj.Config.cephUUIDDirectoryPrefix+reservedUUID, + map[string]string{volumeID: snapshotID}) + if err != nil { + log.ErrorLog(ctx, "failed adding volume snapshot mapping: %v", err) + + return err + } + + return nil +} + +func (sgj *volumeGroupJournalConfig) RemoveVolumeSnapshotMapping( + ctx context.Context, + pool, + reservedUUID, + volumeID string, +) error { + err := removeMapKeys(ctx, sgj.Connection, pool, sgj.Config.namespace, sgj.Config.cephUUIDDirectoryPrefix+reservedUUID, + []string{volumeID}) + if err != nil { + log.ErrorLog(ctx, "failed removing volume snapshot mapping: %v", err) + + return err + } + + return nil +}