cephfs: connect to the Ceph Cluster with go-ceph

Add the ClusterConnection to the volumeOptions type, so that future use
of go-ceph can connect to the Ceph cluster.

Once a volumeOptions object is not needed anymore, it needs to get
destroyed to free associated resources like the ClusterConnection.

Signed-off-by: Niels de Vos <ndevos@redhat.com>
This commit is contained in:
Niels de Vos 2020-08-05 12:10:04 +02:00 committed by mergify[bot]
parent da056a5ef6
commit 0f108edc7b
3 changed files with 45 additions and 1 deletions

View File

@ -156,6 +156,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
util.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err) util.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err)
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
defer volOptions.Destroy()
if req.GetCapacityRange() != nil { if req.GetCapacityRange() != nil {
volOptions.Size = util.RoundOffBytes(req.GetCapacityRange().GetRequiredBytes()) volOptions.Size = util.RoundOffBytes(req.GetCapacityRange().GetRequiredBytes())
@ -166,6 +167,10 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
if err != nil { if err != nil {
return nil, err return nil, err
} }
if parentVol != nil {
defer parentVol.Destroy()
}
vID, err := checkVolExists(ctx, volOptions, parentVol, pvID, sID, cr) vID, err := checkVolExists(ctx, volOptions, parentVol, pvID, sID, cr)
if err != nil { if err != nil {
if errors.Is(err, ErrCloneInProgress) { if errors.Is(err, ErrCloneInProgress) {
@ -325,6 +330,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
} }
return &csi.DeleteVolumeResponse{}, nil return &csi.DeleteVolumeResponse{}, nil
} }
defer volOptions.Destroy()
// lock out parallel delete and create requests against the same volume name as we // lock out parallel delete and create requests against the same volume name as we
// cleanup the subvolume and associated omaps for the same // cleanup the subvolume and associated omaps for the same
@ -410,11 +416,11 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi
defer cr.DeleteCredentials() defer cr.DeleteCredentials()
volOptions, volIdentifier, err := newVolumeOptionsFromVolID(ctx, volID, nil, secret) volOptions, volIdentifier, err := newVolumeOptionsFromVolID(ctx, volID, nil, secret)
if err != nil { if err != nil {
util.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err) util.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err)
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
defer volOptions.Destroy()
RoundOffSize := util.RoundOffBytes(req.GetCapacityRange().GetRequiredBytes()) RoundOffSize := util.RoundOffBytes(req.GetCapacityRange().GetRequiredBytes())
@ -476,6 +482,7 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
} }
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
defer parentVolOptions.Destroy()
if clusterData.ClusterID != parentVolOptions.ClusterID { if clusterData.ClusterID != parentVolOptions.ClusterID {
return nil, status.Errorf(codes.InvalidArgument, "requested cluster id %s not matching subvolume cluster id %s", clusterData.ClusterID, parentVolOptions.ClusterID) return nil, status.Errorf(codes.InvalidArgument, "requested cluster id %s not matching subvolume cluster id %s", clusterData.ClusterID, parentVolOptions.ClusterID)

View File

@ -107,6 +107,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
} }
} }
} }
defer volOptions.Destroy()
// Check if the volume is already mounted // Check if the volume is already mounted

View File

@ -48,6 +48,32 @@ type volumeOptions struct {
FuseMountOptions string `json:"fuseMountOptions"` FuseMountOptions string `json:"fuseMountOptions"`
SubvolumeGroup string SubvolumeGroup string
Features []string Features []string
// conn is a connection to the Ceph cluster obtained from a ConnPool
conn *util.ClusterConnection
}
// Connect a CephFS volume to the Ceph cluster.
func (vo *volumeOptions) Connect(cr *util.Credentials) error {
if vo.conn != nil {
return nil
}
conn := &util.ClusterConnection{}
if err := conn.Connect(vo.Monitors, cr); err != nil {
return err
}
vo.conn = conn
return nil
}
// Destroy cleans up the CephFS volume object and closes the connection to the
// Ceph cluster in case one was setup.
func (vo *volumeOptions) Destroy() {
if vo.conn != nil {
vo.conn.Destroy()
}
} }
func validateNonEmptyField(field, fieldName string) error { func validateNonEmptyField(field, fieldName string) error {
@ -190,6 +216,11 @@ func newVolumeOptions(ctx context.Context, requestName string, req *csi.CreateVo
} }
defer cr.DeleteCredentials() defer cr.DeleteCredentials()
err = opts.Connect(cr)
if err != nil {
return nil, err
}
opts.FscID, err = getFscID(ctx, opts.Monitors, cr, opts.FsName) opts.FscID, err = getFscID(ctx, opts.Monitors, cr, opts.FsName)
if err != nil { if err != nil {
return nil, err return nil, err
@ -253,6 +284,11 @@ func newVolumeOptionsFromVolID(ctx context.Context, volID string, volOpt, secret
} }
defer cr.DeleteCredentials() defer cr.DeleteCredentials()
err = volOptions.Connect(cr)
if err != nil {
return nil, nil, err
}
volOptions.FsName, err = getFsName(ctx, volOptions.Monitors, cr, volOptions.FscID) volOptions.FsName, err = getFsName(ctx, volOptions.Monitors, cr, volOptions.FscID)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err