cleanup: move pkg/ to internal/

The internal/ directory in Go has a special meaning, and indicates that
those packages are not meant for external consumption. Ceph-CSI does
provide public APIs for other projects to consume. There is no plan to
keep the API of the internally used packages stable.

Closes: #903
Signed-off-by: Niels de Vos <ndevos@redhat.com>
This commit is contained in:
Niels de Vos
2020-04-17 11:23:49 +02:00
committed by mergify[bot]
parent d0abc3f5e6
commit 32839948ef
64 changed files with 37 additions and 37 deletions

View File

@ -1,58 +0,0 @@
/*
Copyright 2018 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"errors"
"k8s.io/klog"
)
// ForAllFunc is a unary predicate for visiting all cache entries
// matching the `pattern' in CachePersister's ForAll function.
type ForAllFunc func(identifier string) error
// CacheEntryNotFound is an error type for "Not Found" cache errors
type CacheEntryNotFound struct {
error
}
// CachePersister interface implemented for store
type CachePersister interface {
Create(identifier string, data interface{}) error
Get(identifier string, data interface{}) error
ForAll(pattern string, destObj interface{}, f ForAllFunc) error
Delete(identifier string) error
}
// NewCachePersister returns CachePersister based on store
func NewCachePersister(metadataStore, pluginPath string) (CachePersister, error) {
if metadataStore == "k8s_configmap" {
klog.V(4).Infof("cache-perister: using kubernetes configmap as metadata cache persister")
k8scm := &K8sCMCache{}
k8scm.Client = NewK8sClient()
k8scm.Namespace = GetK8sNamespace()
return k8scm, nil
} else if metadataStore == "node" {
klog.V(4).Infof("cache-persister: using node as metadata cache persister")
nc := &NodeCache{}
nc.BasePath = pluginPath
nc.CacheDir = "controller"
return nc, nil
}
return nil, errors.New("cache-persister: couldn't parse metadatastorage flag")
}

View File

@ -1,355 +0,0 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/exec"
"strings"
"k8s.io/klog"
)
// InvalidPoolID used to denote an invalid pool
const InvalidPoolID int64 = -1
// ExecCommand executes passed in program with args and returns separate stdout and stderr streams
func ExecCommand(program string, args ...string) (stdout, stderr []byte, err error) {
var (
cmd = exec.Command(program, args...) // nolint: gosec, #nosec
sanitizedArgs = StripSecretInArgs(args)
stdoutBuf bytes.Buffer
stderrBuf bytes.Buffer
)
cmd.Stdout = &stdoutBuf
cmd.Stderr = &stderrBuf
if err := cmd.Run(); err != nil {
return stdoutBuf.Bytes(), stderrBuf.Bytes(), fmt.Errorf("an error (%v)"+
" occurred while running %s args: %v", err, program, sanitizedArgs)
}
return stdoutBuf.Bytes(), nil, nil
}
// cephStoragePoolSummary strongly typed JSON spec for osd ls pools output
type cephStoragePoolSummary struct {
Name string `json:"poolname"`
Number int64 `json:"poolnum"`
}
// GetPools fetches a list of pools from a cluster
func getPools(ctx context.Context, monitors string, cr *Credentials) ([]cephStoragePoolSummary, error) {
// ceph <options> -f json osd lspools
// JSON out: [{"poolnum":<int64>,"poolname":<string>}]
stdout, _, err := ExecCommand(
"ceph",
"-m", monitors,
"--id", cr.ID,
"--keyfile="+cr.KeyFile,
"-c", CephConfigPath,
"-f", "json",
"osd", "lspools")
if err != nil {
klog.Errorf(Log(ctx, "failed getting pool list from cluster (%s)"), err)
return nil, err
}
var pools []cephStoragePoolSummary
err = json.Unmarshal(stdout, &pools)
if err != nil {
klog.Errorf(Log(ctx, "failed to parse JSON output of pool list from cluster (%s)"), err)
return nil, fmt.Errorf("unmarshal of pool list failed: %+v. raw buffer response: %s", err, string(stdout))
}
return pools, nil
}
// GetPoolID searches a list of pools in a cluster and returns the ID of the pool that matches
// the passed in poolName parameter
func GetPoolID(ctx context.Context, monitors string, cr *Credentials, poolName string) (int64, error) {
pools, err := getPools(ctx, monitors, cr)
if err != nil {
return 0, err
}
for _, p := range pools {
if poolName == p.Name {
return p.Number, nil
}
}
return 0, fmt.Errorf("pool (%s) not found in Ceph cluster", poolName)
}
// GetPoolName lists all pools in a ceph cluster, and matches the pool whose pool ID is equal to
// the requested poolID parameter
func GetPoolName(ctx context.Context, monitors string, cr *Credentials, poolID int64) (string, error) {
pools, err := getPools(ctx, monitors, cr)
if err != nil {
return "", err
}
for _, p := range pools {
if poolID == p.Number {
return p.Name, nil
}
}
return "", ErrPoolNotFound{string(poolID), fmt.Errorf("pool ID (%d) not found in Ceph cluster", poolID)}
}
// GetPoolIDs searches a list of pools in a cluster and returns the IDs of the pools that matches
// the passed in pools
// TODO this should take in a list and return a map[string(poolname)]int64(poolID)
func GetPoolIDs(ctx context.Context, monitors, journalPool, imagePool string, cr *Credentials) (int64, int64, error) {
journalPoolID, err := GetPoolID(ctx, monitors, cr, journalPool)
if err != nil {
return InvalidPoolID, InvalidPoolID, err
}
imagePoolID := journalPoolID
if imagePool != journalPool {
imagePoolID, err = GetPoolID(ctx, monitors, cr, imagePool)
if err != nil {
return InvalidPoolID, InvalidPoolID, err
}
}
return journalPoolID, imagePoolID, nil
}
// SetOMapKeyValue sets the given key and value into the provided Ceph omap name
func SetOMapKeyValue(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey, keyValue string) error {
// Command: "rados <options> setomapval oMapName oMapKey keyValue"
args := []string{
"-m", monitors,
"--id", cr.ID,
"--keyfile=" + cr.KeyFile,
"-c", CephConfigPath,
"-p", poolName,
"setomapval", oMapName, oMapKey, keyValue,
}
if namespace != "" {
args = append(args, "--namespace="+namespace)
}
_, _, err := ExecCommand("rados", args[:]...)
if err != nil {
klog.Errorf(Log(ctx, "failed adding key (%s with value %s), to omap (%s) in "+
"pool (%s): (%v)"), oMapKey, keyValue, oMapName, poolName, err)
return err
}
return nil
}
// GetOMapValue gets the value for the given key from the named omap
func GetOMapValue(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey string) (string, error) {
// Command: "rados <options> getomapval oMapName oMapKey <outfile>"
// No such key: replicapool/csi.volumes.directory.default/csi.volname
tmpFile, err := ioutil.TempFile("", "omap-get-")
if err != nil {
klog.Errorf(Log(ctx, "failed creating a temporary file for key contents"))
return "", err
}
defer tmpFile.Close()
defer os.Remove(tmpFile.Name())
args := []string{
"-m", monitors,
"--id", cr.ID,
"--keyfile=" + cr.KeyFile,
"-c", CephConfigPath,
"-p", poolName,
"getomapval", oMapName, oMapKey, tmpFile.Name(),
}
if namespace != "" {
args = append(args, "--namespace="+namespace)
}
stdout, stderr, err := ExecCommand("rados", args[:]...)
if err != nil {
// no logs, as attempting to check for non-existent key/value is done even on
// regular call sequences
stdoutanderr := strings.Join([]string{string(stdout), string(stderr)}, " ")
if strings.Contains(stdoutanderr, "No such key: "+poolName+"/"+oMapName+"/"+oMapKey) {
return "", ErrKeyNotFound{poolName + "/" + oMapName + "/" + oMapKey, err}
}
if strings.Contains(stdoutanderr, "error getting omap value "+
poolName+"/"+oMapName+"/"+oMapKey+": (2) No such file or directory") {
return "", ErrKeyNotFound{poolName + "/" + oMapName + "/" + oMapKey, err}
}
if strings.Contains(stdoutanderr, "error opening pool "+
poolName+": (2) No such file or directory") {
return "", ErrPoolNotFound{poolName, err}
}
// log other errors for troubleshooting assistance
klog.Errorf(Log(ctx, "failed getting omap value for key (%s) from omap (%s) in pool (%s): (%v)"),
oMapKey, oMapName, poolName, err)
return "", fmt.Errorf("error (%v) occurred, command output streams is (%s)",
err.Error(), stdoutanderr)
}
keyValue, err := ioutil.ReadAll(tmpFile)
return string(keyValue), err
}
// RemoveOMapKey removes the omap key from the given omap name
func RemoveOMapKey(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey string) error {
// Command: "rados <options> rmomapkey oMapName oMapKey"
args := []string{
"-m", monitors,
"--id", cr.ID,
"--keyfile=" + cr.KeyFile,
"-c", CephConfigPath,
"-p", poolName,
"rmomapkey", oMapName, oMapKey,
}
if namespace != "" {
args = append(args, "--namespace="+namespace)
}
_, _, err := ExecCommand("rados", args[:]...)
if err != nil {
// NOTE: Missing omap key removal does not return an error
klog.Errorf(Log(ctx, "failed removing key (%s), from omap (%s) in "+
"pool (%s): (%v)"), oMapKey, oMapName, poolName, err)
return err
}
return nil
}
// CreateObject creates the object name passed in and returns ErrObjectExists if the provided object
// is already present in rados
func CreateObject(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, objectName string) error {
// Command: "rados <options> create objectName"
args := []string{
"-m", monitors,
"--id", cr.ID,
"--keyfile=" + cr.KeyFile,
"-c", CephConfigPath,
"-p", poolName,
"create", objectName,
}
if namespace != "" {
args = append(args, "--namespace="+namespace)
}
_, stderr, err := ExecCommand("rados", args[:]...)
if err != nil {
klog.Errorf(Log(ctx, "failed creating omap (%s) in pool (%s): (%v)"), objectName, poolName, err)
if strings.Contains(string(stderr), "error creating "+poolName+"/"+objectName+
": (17) File exists") {
return ErrObjectExists{objectName, err}
}
return err
}
return nil
}
// RemoveObject removes the entire omap name passed in and returns ErrObjectNotFound is provided omap
// is not found in rados
func RemoveObject(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, oMapName string) error {
// Command: "rados <options> rm oMapName"
args := []string{
"-m", monitors,
"--id", cr.ID,
"--keyfile=" + cr.KeyFile,
"-c", CephConfigPath,
"-p", poolName,
"rm", oMapName,
}
if namespace != "" {
args = append(args, "--namespace="+namespace)
}
_, stderr, err := ExecCommand("rados", args[:]...)
if err != nil {
klog.Errorf(Log(ctx, "failed removing omap (%s) in pool (%s): (%v)"), oMapName, poolName, err)
if strings.Contains(string(stderr), "error removing "+poolName+">"+oMapName+
": (2) No such file or directory") {
return ErrObjectNotFound{oMapName, err}
}
return err
}
return nil
}
// SetImageMeta sets image metadata
func SetImageMeta(ctx context.Context, cr *Credentials, monitors, imageSpec, key, value string) error {
args := []string{
"-m", monitors,
"--id", cr.ID,
"--keyfile=" + cr.KeyFile,
"-c", CephConfigPath,
"image-meta", "set", imageSpec,
key, value,
}
_, _, err := ExecCommand("rbd", args[:]...)
if err != nil {
klog.Errorf(Log(ctx, "failed setting image metadata (%s) for (%s): (%v)"), key, imageSpec, err)
return err
}
return nil
}
// GetImageMeta gets image metadata
func GetImageMeta(ctx context.Context, cr *Credentials, monitors, imageSpec, key string) (string, error) {
args := []string{
"-m", monitors,
"--id", cr.ID,
"--keyfile=" + cr.KeyFile,
"-c", CephConfigPath,
"image-meta", "get", imageSpec,
key,
}
stdout, stderr, err := ExecCommand("rbd", args[:]...)
if err != nil {
stdoutanderr := strings.Join([]string{string(stdout), string(stderr)}, " ")
if strings.Contains(stdoutanderr, "failed to get metadata "+key+" of image : (2) No such file or directory") {
return "", ErrKeyNotFound{imageSpec + " " + key, err}
}
klog.Errorf(Log(ctx, "failed getting image metadata (%s) for (%s): (%v)"), key, imageSpec, err)
return "", err
}
return string(stdout), nil
}

View File

@ -1,71 +0,0 @@
/*
Copyright 2018 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"io/ioutil"
"os"
)
var cephConfig = []byte(`[global]
auth_cluster_required = cephx
auth_service_required = cephx
auth_client_required = cephx
# Workaround for http://tracker.ceph.com/issues/23446
fuse_set_user_groups = false
`)
const (
cephConfigRoot = "/etc/ceph"
// CephConfigPath ceph configuration file
CephConfigPath = "/etc/ceph/ceph.conf"
keyRing = "/etc/ceph/keyring"
)
func createCephConfigRoot() error {
return os.MkdirAll(cephConfigRoot, 0755) // #nosec
}
// WriteCephConfig writes out a basic ceph.conf file, making it easy to use
// ceph related CLIs
func WriteCephConfig() error {
if err := createCephConfigRoot(); err != nil {
return err
}
err := ioutil.WriteFile(CephConfigPath, cephConfig, 0640)
if err != nil {
return err
}
return createKeyRingFile()
}
/*
if any ceph commands fails it will log below error message
7f39ff02a700 -1 auth: unable to find a keyring on
/etc/ceph/ceph.client.admin.keyring,/etc/ceph/ceph.keyring,/etc/ceph/keyring,
/etc/ceph/keyring.bin,: (2) No such file or directory
*/
// createKeyRingFile creates the keyring files to fix above error message logging
func createKeyRingFile() error {
_, err := os.Create(keyRing)
return err
}

View File

@ -1,206 +0,0 @@
/*
Copyright 2020 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"io/ioutil"
"sync"
"time"
"github.com/ceph/go-ceph/rados"
"github.com/pkg/errors"
)
type connEntry struct {
conn *rados.Conn
lastUsed time.Time
users int
}
// ConnPool is the struct which contains details of connection entries in the pool and gc controlled params.
type ConnPool struct {
// interval to run the garbage collector
interval time.Duration
// timeout for a connEntry to get garbage collected
expiry time.Duration
// Timer used to schedule calls to the garbage collector
timer *time.Timer
// Mutex for loading and touching connEntry's from the conns Map
lock *sync.RWMutex
// all connEntry's in this pool
conns map[string]*connEntry
}
// NewConnPool creates a new connection pool instance and start the garbage collector running
// every @interval.
func NewConnPool(interval, expiry time.Duration) *ConnPool {
cp := ConnPool{
interval: interval,
expiry: expiry,
lock: &sync.RWMutex{},
conns: make(map[string]*connEntry),
}
cp.timer = time.AfterFunc(interval, cp.gc)
return &cp
}
// loop through all cp.conns and destroy objects that have not been used for cp.expiry.
func (cp *ConnPool) gc() {
cp.lock.Lock()
defer cp.lock.Unlock()
now := time.Now()
for key, ce := range cp.conns {
if ce.users == 0 && (now.Sub(ce.lastUsed)) > cp.expiry {
ce.destroy()
delete(cp.conns, key)
}
}
// schedule the next gc() run
cp.timer.Reset(cp.interval)
}
// Destroy stops the garbage collector and destroys all connections in the pool.
func (cp *ConnPool) Destroy() {
cp.timer.Stop()
// wait until gc() has finished, in case it is running
cp.lock.Lock()
defer cp.lock.Unlock()
for key, ce := range cp.conns {
if ce.users != 0 {
panic("this connEntry still has users, operations" +
"might still be in-flight")
}
ce.destroy()
delete(cp.conns, key)
}
}
func (cp *ConnPool) generateUniqueKey(pool, monitors, user, keyfile string) (string, error) {
// the keyfile can be unique for operations, contents will be the same
key, err := ioutil.ReadFile(keyfile) // nolint: gosec, #nosec
if err != nil {
return "", errors.Wrapf(err, "could not open keyfile %s", keyfile)
}
return fmt.Sprintf("%s|%s|%s|%s", pool, monitors, user, string(key)), nil
}
// getExisting returns the existing rados.Conn associated with the unique key.
//
// Requires: locked cp.lock because of ce.get()
func (cp *ConnPool) getConn(unique string) *rados.Conn {
ce, exists := cp.conns[unique]
if exists {
ce.get()
return ce.conn
}
return nil
}
// Get returns a rados.Conn for the given arguments. Creates a new rados.Conn in
// case there is none in the pool. Use the returned unique string to reduce the
// reference count with ConnPool.Put(unique).
func (cp *ConnPool) Get(pool, monitors, user, keyfile string) (*rados.Conn, error) {
unique, err := cp.generateUniqueKey(pool, monitors, user, keyfile)
if err != nil {
return nil, errors.Wrapf(err, "failed to generate unique for connection")
}
cp.lock.RLock()
conn := cp.getConn(unique)
cp.lock.RUnlock()
if conn != nil {
return conn, nil
}
// construct and connect a new rados.Conn
args := []string{"-m", monitors, "--keyfile=" + keyfile}
conn, err = rados.NewConnWithUser(user)
if err != nil {
return nil, errors.Wrapf(err, "creating a new connection failed")
}
err = conn.ParseCmdLineArgs(args)
if err != nil {
return nil, errors.Wrapf(err, "parsing cmdline args (%v) failed", args)
}
err = conn.Connect()
if err != nil {
return nil, errors.Wrapf(err, "connecting failed")
}
ce := &connEntry{
conn: conn,
lastUsed: time.Now(),
users: 1,
}
cp.lock.Lock()
defer cp.lock.Unlock()
oldConn := cp.getConn(unique)
if oldConn != nil {
// there was a race, oldConn already exists
ce.destroy()
return oldConn, nil
}
// this really is a new connection, add it to the map
cp.conns[unique] = ce
return conn, nil
}
// Put reduces the reference count of the rados.Conn object that was returned with
// ConnPool.Get().
func (cp *ConnPool) Put(conn *rados.Conn) {
cp.lock.Lock()
defer cp.lock.Unlock()
for _, ce := range cp.conns {
if ce.conn == conn {
ce.put()
return
}
}
}
// Add a reference to the connEntry.
// /!\ Only call this while holding the ConnPool.lock.
func (ce *connEntry) get() {
ce.lastUsed = time.Now()
ce.users++
}
// Reduce number of references. If this returns true, there are no more users.
// /!\ Only call this while holding the ConnPool.lock.
func (ce *connEntry) put() {
ce.users--
// do not call ce.destroy(), let ConnPool.gc() do that
}
// Destroy a connEntry object, close the connection to the Ceph cluster.
func (ce *connEntry) destroy() {
if ce.conn != nil {
ce.conn.Shutdown()
ce.conn = nil
}
}

View File

@ -1,178 +0,0 @@
/*
Copyright 2020 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/ceph/go-ceph/rados"
)
const (
interval = 15 * time.Minute
expiry = 10 * time.Minute
)
// fakeGet is used as a replacement for ConnPool.Get and does not need a
// working Ceph cluster to connect to.
//
// This is mostly a copy of ConnPool.Get()
func (cp *ConnPool) fakeGet(pool, monitors, user, keyfile string) (*rados.Conn, string, error) {
unique, err := cp.generateUniqueKey(pool, monitors, user, keyfile)
if err != nil {
return nil, "", err
}
// need a lock while calling ce.touch()
cp.lock.RLock()
conn := cp.getConn(unique)
cp.lock.RUnlock()
if conn != nil {
return conn, unique, nil
}
// cp.Get() creates and connects a rados.Conn here
conn, err = rados.NewConn()
if err != nil {
return nil, "", err
}
ce := &connEntry{
conn: conn,
lastUsed: time.Now(),
users: 1,
}
cp.lock.Lock()
defer cp.lock.Unlock()
oldConn := cp.getConn(unique)
if oldConn != nil {
// there was a race, oldConn already exists
ce.destroy()
return oldConn, unique, nil
}
// this really is a new connection, add it to the map
cp.conns[unique] = ce
return conn, unique, nil
}
func TestConnPool(t *testing.T) {
cp := NewConnPool(interval, expiry)
defer cp.Destroy()
// create a keyfile with some contents
keyfile := "/tmp/conn_utils.keyfile"
err := ioutil.WriteFile(keyfile, []byte("the-key"), 0600)
if err != nil {
t.Errorf("failed to create keyfile: %v", err)
return
}
defer os.Remove(keyfile)
var conn *rados.Conn
var unique string
t.Run("fakeGet", func(t *testing.T) {
conn, unique, err = cp.fakeGet("pool", "monitors", "user", keyfile)
if err != nil {
t.Errorf("failed to get connection: %v", err)
}
// prevent goanalysis_metalinter from complaining about unused conn
_ = conn
// there should be a single item in cp.conns
if len(cp.conns) != 1 {
t.Errorf("there is more than a single conn in cp.conns: %v", len(cp.conns))
}
// the ce should have a single user
ce, exists := cp.conns[unique]
if !exists {
t.Errorf("getting the conn from cp.conns failed")
}
if ce.users != 1 {
t.Errorf("there should only be one user: %v", ce.users)
}
})
t.Run("doubleFakeGet", func(t *testing.T) {
// after a 2nd get, there should still be a single conn in cp.conns
_, _, err = cp.fakeGet("pool", "monitors", "user", keyfile)
if err != nil {
t.Errorf("failed to get connection: %v", err)
}
if len(cp.conns) != 1 {
t.Errorf("a second conn was added to cp.conns: %v", len(cp.conns))
}
// the ce should have a two users
ce, exists := cp.conns[unique]
if !exists {
t.Errorf("getting the conn from cp.conns failed")
}
if ce.users != 2 {
t.Errorf("there should be two users: %v", ce.users)
}
cp.Put(ce.conn)
if len(cp.conns) != 1 {
t.Errorf("a single put should not remove all cp.conns: %v", len(cp.conns))
}
// the ce should have a single user again
ce, exists = cp.conns[unique]
if !exists {
t.Errorf("getting the conn from cp.conns failed")
}
if ce.users != 1 {
t.Errorf("There should only be one user: %v", ce.users)
}
})
// there is still one conn in cp.conns after "doubleFakeGet"
t.Run("garbageCollection", func(t *testing.T) {
// timeout has not occurred yet, so number of conns in the list should stay the same
cp.gc()
if len(cp.conns) != 1 {
t.Errorf("gc() should not have removed any entry from cp.conns: %v", len(cp.conns))
}
// force expiring the ConnEntry by fetching it and adjusting .lastUsed
ce, exists := cp.conns[unique]
if !exists {
t.Error("getting the conn from cp.conns failed")
}
ce.lastUsed = ce.lastUsed.Add(-2 * expiry)
if ce.users != 1 {
t.Errorf("There should only be one user: %v", ce.users)
}
cp.Put(ce.conn)
if ce.users != 0 {
t.Errorf("There should be no users anymore: %v", ce.users)
}
// timeout has occurred now, so number of conns in the list should be less
cp.gc()
if len(cp.conns) != 0 {
t.Errorf("gc() should have removed an entry from cp.conns: %v", len(cp.conns))
}
})
}

View File

@ -1,126 +0,0 @@
/*
Copyright 2018 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"errors"
"fmt"
"io/ioutil"
"os"
)
const (
credUserID = "userID"
credUserKey = "userKey"
credAdminID = "adminID"
credAdminKey = "adminKey"
credMonitors = "monitors"
tmpKeyFileLocation = "/tmp/csi/keys"
tmpKeyFileNamePrefix = "keyfile-"
)
type Credentials struct {
ID string
KeyFile string
}
func storeKey(key string) (string, error) {
tmpfile, err := ioutil.TempFile(tmpKeyFileLocation, tmpKeyFileNamePrefix)
if err != nil {
return "", fmt.Errorf("error creating a temporary keyfile (%s)", err)
}
defer func() {
if err != nil {
// don't complain about unhandled error
_ = os.Remove(tmpfile.Name())
}
}()
if _, err = tmpfile.Write([]byte(key)); err != nil {
return "", fmt.Errorf("error writing key to temporary keyfile (%s)", err)
}
keyFile := tmpfile.Name()
if keyFile == "" {
err = fmt.Errorf("error reading temporary filename for key (%s)", err)
return "", err
}
if err = tmpfile.Close(); err != nil {
return "", fmt.Errorf("error closing temporary filename (%s)", err)
}
return keyFile, nil
}
func newCredentialsFromSecret(idField, keyField string, secrets map[string]string) (*Credentials, error) {
var (
c = &Credentials{}
ok bool
)
if len(secrets) == 0 {
return nil, errors.New("provided secret is empty")
}
if c.ID, ok = secrets[idField]; !ok {
return nil, fmt.Errorf("missing ID field '%s' in secrets", idField)
}
key := secrets[keyField]
if key == "" {
return nil, fmt.Errorf("missing key field '%s' in secrets", keyField)
}
keyFile, err := storeKey(key)
if err == nil {
c.KeyFile = keyFile
}
return c, err
}
func (cr *Credentials) DeleteCredentials() {
// don't complain about unhandled error
_ = os.Remove(cr.KeyFile)
}
func NewUserCredentials(secrets map[string]string) (*Credentials, error) {
return newCredentialsFromSecret(credUserID, credUserKey, secrets)
}
func NewAdminCredentials(secrets map[string]string) (*Credentials, error) {
return newCredentialsFromSecret(credAdminID, credAdminKey, secrets)
}
func NewCredentials(id, key string) (*Credentials, error) {
var c = &Credentials{}
c.ID = id
keyFile, err := storeKey(key)
if err == nil {
c.KeyFile = keyFile
}
return c, err
}
func GetMonValFromSecret(secrets map[string]string) (string, error) {
if mons, ok := secrets[credMonitors]; ok {
return mons, nil
}
return "", fmt.Errorf("missing %q", credMonitors)
}

View File

@ -1,274 +0,0 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"path"
"strings"
"github.com/pkg/errors"
"crypto/rand"
"k8s.io/klog"
)
const (
mapperFilePrefix = "luks-rbd-"
mapperFilePathPrefix = "/dev/mapper"
// image metadata key for encryption
encryptionMetaKey = ".rbd.csi.ceph.com/encrypted"
// Encryption passphrase location in K8s secrets
encryptionPassphraseKey = "encryptionPassphrase"
kmsTypeKey = "encryptionKMSType"
// Default KMS type
defaultKMSType = "default"
// kmsConfigPath is the location of the vault config file
kmsConfigPath = "/etc/ceph-csi-encryption-kms-config/config.json"
// Passphrase size - 20 bytes is 160 bits to satisfy:
// https://tools.ietf.org/html/rfc6749#section-10.10
encryptionPassphraseSize = 20
)
// EncryptionKMS provides external Key Management System for encryption
// passphrases storage
type EncryptionKMS interface {
GetPassphrase(key string) (string, error)
SavePassphrase(key, value string) error
DeletePassphrase(key string) error
GetID() string
}
// MissingPassphrase is an error instructing to generate new passphrase
type MissingPassphrase struct {
error
}
// SecretsKMS is default KMS implementation that means no KMS is in use
type SecretsKMS struct {
passphrase string
}
func initSecretsKMS(secrets map[string]string) (EncryptionKMS, error) {
passphraseValue, ok := secrets[encryptionPassphraseKey]
if !ok {
return nil, errors.New("missing encryption passphrase in secrets")
}
return SecretsKMS{passphrase: passphraseValue}, nil
}
// GetPassphrase returns passphrase from Kubernetes secrets
func (kms SecretsKMS) GetPassphrase(key string) (string, error) {
return kms.passphrase, nil
}
// SavePassphrase is not implemented
func (kms SecretsKMS) SavePassphrase(key, value string) error {
return fmt.Errorf("save new passphrase is not implemented for Kubernetes secrets")
}
// DeletePassphrase is doing nothing as no new passphrases are saved with
// SecretsKMS
func (kms SecretsKMS) DeletePassphrase(key string) error {
return nil
}
// GetID is returning ID representing default KMS `default`
func (kms SecretsKMS) GetID() string {
return defaultKMSType
}
// GetKMS returns an instance of Key Management System
func GetKMS(kmsID string, secrets map[string]string) (EncryptionKMS, error) {
if kmsID == "" || kmsID == defaultKMSType {
return initSecretsKMS(secrets)
}
// #nosec
content, err := ioutil.ReadFile(kmsConfigPath)
if err != nil {
return nil, fmt.Errorf("failed to read kms configuration from %s: %s",
kmsConfigPath, err)
}
var config map[string]interface{}
err = json.Unmarshal(content, &config)
if err != nil {
return nil, fmt.Errorf("failed to parse kms configuration: %s", err)
}
kmsConfigData, ok := config[kmsID].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("missing encryption KMS configuration with %s", kmsID)
}
kmsConfig := make(map[string]string)
for key, value := range kmsConfigData {
kmsConfig[key], ok = value.(string)
if !ok {
return nil, fmt.Errorf("broken KMS config: '%s' for '%s' is not a string",
value, key)
}
}
kmsType, ok := kmsConfig[kmsTypeKey]
if !ok {
return nil, fmt.Errorf("encryption KMS configuration for %s is missing KMS type", kmsID)
}
if kmsType == "vault" {
return InitVaultKMS(kmsID, kmsConfig, secrets)
}
return nil, fmt.Errorf("unknown encryption KMS type %s", kmsType)
}
// GetCryptoPassphrase Retrieves passphrase to encrypt volume
func GetCryptoPassphrase(ctx context.Context, volumeID string, kms EncryptionKMS) (string, error) {
passphrase, err := kms.GetPassphrase(volumeID)
if err == nil {
return passphrase, nil
}
if _, ok := err.(MissingPassphrase); ok {
klog.V(4).Infof(Log(ctx, "Encryption passphrase is missing for %s. Generating a new one"),
volumeID)
passphrase, err = generateNewEncryptionPassphrase()
if err != nil {
return "", fmt.Errorf("failed to generate passphrase for %s: %s", volumeID, err)
}
err = kms.SavePassphrase(volumeID, passphrase)
if err != nil {
return "", fmt.Errorf("failed to save the passphrase for %s: %s", volumeID, err)
}
return passphrase, nil
}
klog.Errorf(Log(ctx, "failed to get encryption passphrase for %s: %s"), volumeID, err)
return "", err
}
// generateNewEncryptionPassphrase generates a random passphrase for encryption
func generateNewEncryptionPassphrase() (string, error) {
bytesPassphrase := make([]byte, encryptionPassphraseSize)
_, err := rand.Read(bytesPassphrase)
if err != nil {
return "", err
}
return base64.URLEncoding.EncodeToString(bytesPassphrase), nil
}
// VolumeMapper returns file name and it's path to where encrypted device should be open
func VolumeMapper(volumeID string) (mapperFile, mapperFilePath string) {
mapperFile = mapperFilePrefix + volumeID
mapperFilePath = path.Join(mapperFilePathPrefix, mapperFile)
return mapperFile, mapperFilePath
}
// EncryptVolume encrypts provided device with LUKS
func EncryptVolume(ctx context.Context, devicePath, passphrase string) error {
klog.V(4).Infof(Log(ctx, "Encrypting device %s with LUKS"), devicePath)
if _, _, err := LuksFormat(devicePath, passphrase); err != nil {
return errors.Wrapf(err, "failed to encrypt device %s with LUKS", devicePath)
}
return nil
}
// OpenEncryptedVolume opens volume so that it can be used by the client
func OpenEncryptedVolume(ctx context.Context, devicePath, mapperFile, passphrase string) error {
klog.V(4).Infof(Log(ctx, "Opening device %s with LUKS on %s"), devicePath, mapperFile)
_, _, err := LuksOpen(devicePath, mapperFile, passphrase)
return err
}
// CloseEncryptedVolume closes encrypted volume so it can be detached
func CloseEncryptedVolume(ctx context.Context, mapperFile string) error {
klog.V(4).Infof(Log(ctx, "Closing LUKS device %s"), mapperFile)
_, _, err := LuksClose(mapperFile)
return err
}
// IsDeviceOpen determines if encrypted device is already open
func IsDeviceOpen(ctx context.Context, device string) (bool, error) {
_, mappedFile, err := DeviceEncryptionStatus(ctx, device)
return (mappedFile != ""), err
}
// DeviceEncryptionStatus looks to identify if the passed device is a LUKS mapping
// and if so what the device is and the mapper name as used by LUKS.
// If not, just returns the original device and an empty string.
func DeviceEncryptionStatus(ctx context.Context, devicePath string) (mappedDevice, mapper string, err error) {
if !strings.HasPrefix(devicePath, mapperFilePathPrefix) {
return devicePath, "", nil
}
mapPath := strings.TrimPrefix(devicePath, mapperFilePathPrefix+"/")
stdout, _, err := LuksStatus(mapPath)
if err != nil {
klog.V(4).Infof(Log(ctx, "device %s is not an active LUKS device: %v"), devicePath, err)
return devicePath, "", nil
}
lines := strings.Split(string(stdout), "\n")
if len(lines) < 1 {
return "", "", fmt.Errorf("device encryption status returned no stdout for %s", devicePath)
}
if !strings.HasSuffix(lines[0], " is active.") {
// Implies this is not a LUKS device
return devicePath, "", nil
}
for i := 1; i < len(lines); i++ {
kv := strings.SplitN(strings.TrimSpace(lines[i]), ":", 2)
if len(kv) < 1 {
return "", "", fmt.Errorf("device encryption status output for %s is badly formatted: %s",
devicePath, lines[i])
}
if strings.Compare(kv[0], "device") == 0 {
return strings.TrimSpace(kv[1]), mapPath, nil
}
}
// Identified as LUKS, but failed to identify a mapped device
return "", "", fmt.Errorf("mapped device not found in path %s", devicePath)
}
// CheckRbdImageEncrypted verifies if rbd image was encrypted when created
func CheckRbdImageEncrypted(ctx context.Context, cr *Credentials, monitors, imageSpec string) (string, error) {
value, err := GetImageMeta(ctx, cr, monitors, imageSpec, encryptionMetaKey)
if err != nil {
klog.Errorf(Log(ctx, "checking image %s encrypted state metadata failed: %s"), imageSpec, err)
return "", err
}
encrypted := strings.TrimSpace(value)
klog.V(4).Infof(Log(ctx, "image %s encrypted state metadata reports %q"), imageSpec, encrypted)
return encrypted, nil
}
// SaveRbdImageEncryptionStatus sets image metadata for encryption status
func SaveRbdImageEncryptionStatus(ctx context.Context, cr *Credentials, monitors, imageSpec, status string) error {
err := SetImageMeta(ctx, cr, monitors, imageSpec, encryptionMetaKey, status)
if err != nil {
err = fmt.Errorf("failed to save image metadata encryption status for %s: %v", imageSpec, err.Error())
klog.Errorf(Log(ctx, err.Error()))
return err
}
return nil
}

View File

@ -1,67 +0,0 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"bytes"
"fmt"
"os/exec"
"strings"
)
// LuksFormat sets up volume as an encrypted LUKS partition
func LuksFormat(devicePath, passphrase string) (stdout, stderr []byte, err error) {
return execCryptsetupCommand(&passphrase, "-q", "luksFormat", "--hash", "sha256", devicePath, "-d", "/dev/stdin")
}
// LuksOpen opens LUKS encrypted partition and sets up a mapping
func LuksOpen(devicePath, mapperFile, passphrase string) (stdout, stderr []byte, err error) {
return execCryptsetupCommand(&passphrase, "luksOpen", devicePath, mapperFile, "-d", "/dev/stdin")
}
// LuksClose removes existing mapping
func LuksClose(mapperFile string) (stdout, stderr []byte, err error) {
return execCryptsetupCommand(nil, "luksClose", mapperFile)
}
// LuksStatus returns encryption status of a provided device
func LuksStatus(mapperFile string) (stdout, stderr []byte, err error) {
return execCryptsetupCommand(nil, "status", mapperFile)
}
func execCryptsetupCommand(stdin *string, args ...string) (stdout, stderr []byte, err error) {
var (
program = "cryptsetup"
cmd = exec.Command(program, args...) // nolint: gosec, #nosec
sanitizedArgs = StripSecretInArgs(args)
stdoutBuf bytes.Buffer
stderrBuf bytes.Buffer
)
cmd.Stdout = &stdoutBuf
cmd.Stderr = &stderrBuf
if stdin != nil {
cmd.Stdin = strings.NewReader(*stdin)
}
if err := cmd.Run(); err != nil {
return stdoutBuf.Bytes(), stderrBuf.Bytes(), fmt.Errorf("an error (%v)"+
" occurred while running %s args: %v", err, program, sanitizedArgs)
}
return stdoutBuf.Bytes(), nil, nil
}

View File

@ -1,74 +0,0 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"encoding/json"
"fmt"
"io/ioutil"
"strings"
)
/*
Mons returns a comma separated MON list from the csi config for the given clusterID
Expected JSON structure in the passed in config file is,
[
{
"clusterID": "<cluster-id>",
"monitors":
[
"<monitor-value>",
"<monitor-value>",
...
]
},
...
]
*/
// clusterInfo strongly typed JSON spec for the above JSON structure
type clusterInfo struct {
ClusterID string `json:"clusterID"`
Monitors []string `json:"monitors"`
}
func Mons(pathToConfig, clusterID string) (string, error) {
var config []clusterInfo
// #nosec
content, err := ioutil.ReadFile(pathToConfig)
if err != nil {
err = fmt.Errorf("error fetching configuration for cluster ID (%s). (%s)", clusterID, err)
return "", err
}
err = json.Unmarshal(content, &config)
if err != nil {
return "", fmt.Errorf("unmarshal failed: %v. raw buffer response: %s",
err, string(content))
}
for _, cluster := range config {
if cluster.ClusterID == clusterID {
if len(cluster.Monitors) == 0 {
return "", fmt.Errorf("empty monitor list for cluster ID (%s) in config", clusterID)
}
return strings.Join(cluster.Monitors, ","), nil
}
}
return "", fmt.Errorf("missing configuration for cluster ID (%s)", clusterID)
}

View File

@ -1,132 +0,0 @@
/*
Copyright 2019 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"io/ioutil"
"os"
"testing"
)
var basePath = "./test_artifacts"
var csiClusters = "csi-clusters.json"
var pathToConfig = basePath + "/" + csiClusters
var clusterID1 = "test1"
var clusterID2 = "test2"
func cleanupTestData() {
os.RemoveAll(basePath)
}
// nolint: gocyclo
func TestCSIConfig(t *testing.T) {
var err error
var data string
var content string
defer cleanupTestData()
err = os.MkdirAll(basePath, 0700)
if err != nil {
t.Errorf("Test setup error %s", err)
}
// TEST: Should fail as clusterid file is missing
_, err = Mons(pathToConfig, clusterID1)
if err == nil {
t.Errorf("Failed: expected error due to missing config")
}
data = ""
err = ioutil.WriteFile(basePath+"/"+csiClusters, []byte(data), 0644)
if err != nil {
t.Errorf("Test setup error %s", err)
}
// TEST: Should fail as file is empty
content, err = Mons(pathToConfig, clusterID1)
if err == nil {
t.Errorf("Failed: want (%s), got (%s)", data, content)
}
data = "[{\"clusterIDBad\":\"" + clusterID2 + "\",\"monitors\":[\"mon1\",\"mon2\",\"mon3\"]}]"
err = ioutil.WriteFile(basePath+"/"+csiClusters, []byte(data), 0644)
if err != nil {
t.Errorf("Test setup error %s", err)
}
// TEST: Should fail as clusterID data is malformed
content, err = Mons(pathToConfig, clusterID2)
if err == nil {
t.Errorf("Failed: want (%s), got (%s)", data, content)
}
data = "[{\"clusterID\":\"" + clusterID2 + "\",\"monitorsBad\":[\"mon1\",\"mon2\",\"mon3\"]}]"
err = ioutil.WriteFile(basePath+"/"+csiClusters, []byte(data), 0644)
if err != nil {
t.Errorf("Test setup error %s", err)
}
// TEST: Should fail as monitors key is incorrect/missing
content, err = Mons(pathToConfig, clusterID2)
if err == nil {
t.Errorf("Failed: want (%s), got (%s)", data, content)
}
data = "[{\"clusterID\":\"" + clusterID2 + "\",\"monitors\":[\"mon1\",2,\"mon3\"]}]"
err = ioutil.WriteFile(basePath+"/"+csiClusters, []byte(data), 0644)
if err != nil {
t.Errorf("Test setup error %s", err)
}
// TEST: Should fail as monitor data is malformed
content, err = Mons(pathToConfig, clusterID2)
if err == nil {
t.Errorf("Failed: want (%s), got (%s)", data, content)
}
data = "[{\"clusterID\":\"" + clusterID2 + "\",\"monitors\":[\"mon1\",\"mon2\",\"mon3\"]}]"
err = ioutil.WriteFile(basePath+"/"+csiClusters, []byte(data), 0644)
if err != nil {
t.Errorf("Test setup error %s", err)
}
// TEST: Should fail as clusterID is not present in config
content, err = Mons(pathToConfig, clusterID1)
if err == nil {
t.Errorf("Failed: want (%s), got (%s)", data, content)
}
// TEST: Should pass as clusterID is present in config
content, err = Mons(pathToConfig, clusterID2)
if err != nil || content != "mon1,mon2,mon3" {
t.Errorf("Failed: want (%s), got (%s) (%v)", "mon1,mon2,mon3", content, err)
}
data = "[{\"clusterID\":\"" + clusterID2 + "\",\"monitors\":[\"mon1\",\"mon2\",\"mon3\"]}," +
"{\"clusterID\":\"" + clusterID1 + "\",\"monitors\":[\"mon4\",\"mon5\",\"mon6\"]}]"
err = ioutil.WriteFile(basePath+"/"+csiClusters, []byte(data), 0644)
if err != nil {
t.Errorf("Test setup error %s", err)
}
// TEST: Should pass as clusterID is present in config
content, err = Mons(pathToConfig, clusterID1)
if err != nil || content != "mon4,mon5,mon6" {
t.Errorf("Failed: want (%s), got (%s) (%v)", "mon4,mon5,mon6", content, err)
}
}

View File

@ -1,68 +0,0 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
// ErrKeyNotFound is returned when requested key in omap is not found
type ErrKeyNotFound struct {
keyName string
err error
}
func (e ErrKeyNotFound) Error() string {
return e.err.Error()
}
// ErrObjectExists is returned when named omap is already present in rados
type ErrObjectExists struct {
objectName string
err error
}
func (e ErrObjectExists) Error() string {
return e.err.Error()
}
// ErrObjectNotFound is returned when named omap is not found in rados
type ErrObjectNotFound struct {
oMapName string
err error
}
func (e ErrObjectNotFound) Error() string {
return e.err.Error()
}
// ErrSnapNameConflict is generated when a requested CSI snap name already exists on RBD but with
// different properties, and hence is in conflict with the passed in CSI volume name
type ErrSnapNameConflict struct {
requestName string
err error
}
func (e ErrSnapNameConflict) Error() string {
return e.err.Error()
}
// ErrPoolNotFound is returned when pool is not found
type ErrPoolNotFound struct {
Pool string
Err error
}
func (e ErrPoolNotFound) Error() string {
return e.Err.Error()
}

View File

@ -1,27 +0,0 @@
package util
import (
"net"
"net/http"
"net/url"
"strconv"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/klog"
)
// ValidateURL validates the url
func ValidateURL(c *Config) error {
_, err := url.Parse(c.MetricsPath)
return err
}
// StartMetricsServer starts http server
func StartMetricsServer(c *Config) {
addr := net.JoinHostPort(c.MetricsIP, strconv.Itoa(c.MetricsPort))
http.Handle(c.MetricsPath, promhttp.Handler())
err := http.ListenAndServe(addr, nil)
if err != nil {
klog.Fatalln(err)
}
}

View File

@ -1,60 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"sync"
"k8s.io/apimachinery/pkg/util/sets"
)
const (
// VolumeOperationAlreadyExistsFmt string format to return for concurrent operation
VolumeOperationAlreadyExistsFmt = "an operation with the given Volume ID %s already exists"
// SnapshotOperationAlreadyExistsFmt string format to return for concurrent operation
SnapshotOperationAlreadyExistsFmt = "an operation with the given Snapshot ID %s already exists"
)
// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs
// with an ongoing operation.
type VolumeLocks struct {
locks sets.String
mux sync.Mutex
}
// NewVolumeLocks returns new VolumeLocks
func NewVolumeLocks() *VolumeLocks {
return &VolumeLocks{
locks: sets.NewString(),
}
}
// TryAcquire tries to acquire the lock for operating on volumeID and returns true if successful.
// If another operation is already using volumeID, returns false.
func (vl *VolumeLocks) TryAcquire(volumeID string) bool {
vl.mux.Lock()
defer vl.mux.Unlock()
if vl.locks.Has(volumeID) {
return false
}
vl.locks.Insert(volumeID)
return true
}
func (vl *VolumeLocks) Release(volumeID string) {
vl.mux.Lock()
defer vl.mux.Unlock()
vl.locks.Delete(volumeID)
}

View File

@ -1,52 +0,0 @@
/*
Copyright 2019 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
)
// very basic tests for the moment
func TestIDLocker(t *testing.T) {
fakeID := "fake-id"
locks := NewVolumeLocks()
// acquire lock for fake-id
ok := locks.TryAcquire(fakeID)
if !ok {
t.Errorf("TryAcquire failed: want (%v), got (%v)",
true, ok)
}
// try to acquire lock again for fake-id, as lock is already present
// it should fail
ok = locks.TryAcquire(fakeID)
if ok {
t.Errorf("TryAcquire failed: want (%v), got (%v)",
false, ok)
}
// release the lock for fake-id and try to get lock again, it should pass
locks.Release(fakeID)
ok = locks.TryAcquire(fakeID)
if !ok {
t.Errorf("TryAcquire failed: want (%v), got (%v)",
true, ok)
}
}

View File

@ -1,188 +0,0 @@
/*
Copyright 2018 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"encoding/json"
"fmt"
"os"
"regexp"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"
)
// K8sCMCache to store metadata
type K8sCMCache struct {
Client *k8s.Clientset
Namespace string
}
const (
defaultNamespace = "default"
cmLabel = "csi-metadata"
cmDataKey = "content"
csiMetadataLabelAttr = "com.ceph.ceph-csi/metadata"
)
// GetK8sNamespace returns pod namespace. if pod namespace is empty
// it returns default namespace
func GetK8sNamespace() string {
namespace := os.Getenv("POD_NAMESPACE")
if namespace == "" {
return defaultNamespace
}
return namespace
}
// NewK8sClient create kubernetes client
func NewK8sClient() *k8s.Clientset {
var cfg *rest.Config
var err error
cPath := os.Getenv("KUBERNETES_CONFIG_PATH")
if cPath != "" {
cfg, err = clientcmd.BuildConfigFromFlags("", cPath)
if err != nil {
klog.Errorf("Failed to get cluster config with error: %v\n", err)
os.Exit(1)
}
} else {
cfg, err = rest.InClusterConfig()
if err != nil {
klog.Errorf("Failed to get cluster config with error: %v\n", err)
os.Exit(1)
}
}
client, err := k8s.NewForConfig(cfg)
if err != nil {
klog.Errorf("Failed to create client with error: %v\n", err)
os.Exit(1)
}
return client
}
func (k8scm *K8sCMCache) getMetadataCM(resourceID string) (*v1.ConfigMap, error) {
cm, err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Get(context.TODO(), resourceID, metav1.GetOptions{})
if err != nil {
return nil, err
}
return cm, nil
}
// ForAll list the metadata in configmaps and filters outs based on the pattern
func (k8scm *K8sCMCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error {
listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", csiMetadataLabelAttr, cmLabel)}
cms, err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).List(context.TODO(), listOpts)
if err != nil {
return errors.Wrap(err, "k8s-cm-cache: failed to list metadata configmaps")
}
for i := range cms.Items {
data := cms.Items[i].Data[cmDataKey]
match, err := regexp.MatchString(pattern, cms.Items[i].ObjectMeta.Name)
if err != nil {
continue
}
if !match {
continue
}
if err = json.Unmarshal([]byte(data), destObj); err != nil {
return errors.Wrapf(err, "k8s-cm-cache: JSON unmarshaling failed for configmap %s", cms.Items[i].ObjectMeta.Name)
}
if err = f(cms.Items[i].ObjectMeta.Name); err != nil {
return err
}
}
return nil
}
// Create stores the metadata in configmaps with identifier name
func (k8scm *K8sCMCache) Create(identifier string, data interface{}) error {
cm, err := k8scm.getMetadataCM(identifier)
if cm != nil && err == nil {
klog.V(4).Infof("k8s-cm-cache: configmap %s already exists, skipping configmap creation", identifier)
return nil
}
dataJSON, err := json.Marshal(data)
if err != nil {
return errors.Wrapf(err, "k8s-cm-cache: JSON marshaling failed for configmap %s", identifier)
}
cm = &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: identifier,
Namespace: k8scm.Namespace,
Labels: map[string]string{
csiMetadataLabelAttr: cmLabel,
},
},
Data: map[string]string{},
}
cm.Data[cmDataKey] = string(dataJSON)
_, err = k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{})
if err != nil {
if apierrs.IsAlreadyExists(err) {
klog.V(4).Infof("k8s-cm-cache: configmap %s already exists", identifier)
return nil
}
return errors.Wrapf(err, "k8s-cm-cache: couldn't persist %s metadata as configmap", identifier)
}
klog.V(4).Infof("k8s-cm-cache: configmap %s successfully created", identifier)
return nil
}
// Get retrieves the metadata in configmaps with identifier name
func (k8scm *K8sCMCache) Get(identifier string, data interface{}) error {
cm, err := k8scm.getMetadataCM(identifier)
if err != nil {
if apierrs.IsNotFound(err) {
return &CacheEntryNotFound{err}
}
return err
}
err = json.Unmarshal([]byte(cm.Data[cmDataKey]), data)
if err != nil {
return errors.Wrapf(err, "k8s-cm-cache: JSON unmarshaling failed for configmap %s", identifier)
}
return nil
}
// Delete deletes the metadata in configmaps with identifier name
func (k8scm *K8sCMCache) Delete(identifier string) error {
err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Delete(context.TODO(), identifier, metav1.DeleteOptions{})
if err != nil {
if apierrs.IsNotFound(err) {
klog.V(4).Infof("k8s-cm-cache: cannot delete missing metadata configmap %s, assuming it's already deleted", identifier)
return nil
}
return errors.Wrapf(err, "k8s-cm-cache: couldn't delete metadata configmap %s", identifier)
}
klog.V(4).Infof("k8s-cm-cache: successfully deleted metadata configmap %s", identifier)
return nil
}

View File

@ -1,42 +0,0 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"fmt"
)
type contextKey string
// CtxKey for context based logging
var CtxKey = contextKey("ID")
// ReqID for logging request ID
var ReqID = contextKey("Req-ID")
// Log helps in context based logging
func Log(ctx context.Context, format string) string {
id := ctx.Value(CtxKey)
if id == nil {
return format
}
a := fmt.Sprintf("ID: %v ", id)
reqID := ctx.Value(ReqID)
if reqID == nil {
return a + format
}
a += fmt.Sprintf("Req-ID: %v ", reqID)
return a + format
}

View File

@ -1,164 +0,0 @@
/*
Copyright 2018 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"encoding/json"
"io/ioutil"
"os"
"path"
"path/filepath"
"regexp"
"strings"
"github.com/pkg/errors"
"k8s.io/klog"
)
// NodeCache to store metadata
type NodeCache struct {
BasePath string
CacheDir string
}
var errDec = errors.New("file not found")
// EnsureCacheDirectory creates cache directory if not present
func (nc *NodeCache) EnsureCacheDirectory(cacheDir string) error {
fullPath := path.Join(nc.BasePath, cacheDir)
if _, err := os.Stat(fullPath); os.IsNotExist(err) {
// #nosec
if err := os.Mkdir(fullPath, 0755); err != nil {
return errors.Wrapf(err, "node-cache: failed to create %s folder", fullPath)
}
}
return nil
}
// ForAll list the metadata in Nodecache and filters outs based on the pattern
func (nc *NodeCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error {
err := nc.EnsureCacheDirectory(nc.CacheDir)
if err != nil {
return errors.Wrap(err, "node-cache: couldn't ensure cache directory exists")
}
files, err := ioutil.ReadDir(path.Join(nc.BasePath, nc.CacheDir))
if err != nil {
return errors.Wrapf(err, "node-cache: failed to read %s folder", nc.BasePath)
}
cachePath := path.Join(nc.BasePath, nc.CacheDir)
for _, file := range files {
err = decodeObj(cachePath, pattern, file, destObj)
if err == errDec {
continue
} else if err == nil {
if err = f(strings.TrimSuffix(file.Name(), filepath.Ext(file.Name()))); err != nil {
return err
}
}
return err
}
return nil
}
func decodeObj(fpath, pattern string, file os.FileInfo, destObj interface{}) error {
match, err := regexp.MatchString(pattern, file.Name())
if err != nil || !match {
return errDec
}
if !strings.HasSuffix(file.Name(), ".json") {
return errDec
}
// #nosec
fp, err := os.Open(path.Join(fpath, file.Name()))
if err != nil {
klog.V(4).Infof("node-cache: open file: %s err %v", file.Name(), err)
return errDec
}
decoder := json.NewDecoder(fp)
if err = decoder.Decode(destObj); err != nil {
if err = fp.Close(); err != nil {
return errors.Wrapf(err, "failed to close file %s", file.Name())
}
return errors.Wrapf(err, "node-cache: couldn't decode file %s", file.Name())
}
return nil
}
// Create creates the metadata file in cache directory with identifier name
func (nc *NodeCache) Create(identifier string, data interface{}) error {
file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json")
fp, err := os.Create(file)
if err != nil {
return errors.Wrapf(err, "node-cache: failed to create metadata storage file %s\n", file)
}
defer func() {
if err = fp.Close(); err != nil {
klog.Warningf("failed to close file:%s %v", fp.Name(), err)
}
}()
encoder := json.NewEncoder(fp)
if err = encoder.Encode(data); err != nil {
return errors.Wrapf(err, "node-cache: failed to encode metadata for file: %s\n", file)
}
klog.V(4).Infof("node-cache: successfully saved metadata into file: %s\n", file)
return nil
}
// Get retrieves the metadata from cache directory with identifier name
func (nc *NodeCache) Get(identifier string, data interface{}) error {
file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json")
// #nosec
fp, err := os.Open(file)
if err != nil {
if os.IsNotExist(errors.Cause(err)) {
return &CacheEntryNotFound{err}
}
return errors.Wrapf(err, "node-cache: open error for %s", file)
}
defer func() {
if err = fp.Close(); err != nil {
klog.Warningf("failed to close file:%s %v", fp.Name(), err)
}
}()
decoder := json.NewDecoder(fp)
if err = decoder.Decode(data); err != nil {
return errors.Wrap(err, "rbd: decode error")
}
return nil
}
// Delete deletes the metadata file from cache directory with identifier name
func (nc *NodeCache) Delete(identifier string) error {
file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json")
err := os.Remove(file)
if err != nil {
if os.IsNotExist(err) {
klog.V(4).Infof("node-cache: cannot delete missing metadata storage file %s, assuming it's already deleted", file)
return nil
}
return errors.Wrapf(err, "node-cache: error removing file %s", file)
}
klog.V(4).Infof("node-cache: successfully deleted metadata storage file at: %+v\n", file)
return nil
}

View File

@ -1,121 +0,0 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"bufio"
"fmt"
"io"
"os"
"strconv"
"strings"
)
const (
procCgroup = "/proc/self/cgroup"
sysPidsMaxFmt = "/sys/fs/cgroup/pids%s/pids.max"
)
// return the cgouprs "pids.max" file of the current process
//
// find the line containing the pids group from the /proc/self/cgroup file
// $ grep 'pids' /proc/self/cgroup
// 7:pids:/kubepods.slice/kubepods-besteffort.slice/....scope
// $ cat /sys/fs/cgroup/pids + *.scope + /pids.max
func getCgroupPidsFile() (string, error) {
cgroup, err := os.Open(procCgroup)
if err != nil {
return "", err
}
defer cgroup.Close()
scanner := bufio.NewScanner(cgroup)
var slice string
for scanner.Scan() {
parts := strings.Split(scanner.Text(), ":")
if parts == nil || len(parts) < 3 {
continue
}
if parts[1] == "pids" {
slice = parts[2]
break
}
}
if slice == "" {
return "", fmt.Errorf("could not find a cgroup for 'pids'")
}
pidsMax := fmt.Sprintf(sysPidsMaxFmt, slice)
return pidsMax, nil
}
// GetPIDLimit returns the current PID limit, or an error. A value of -1
// translates to "max".
func GetPIDLimit() (int, error) {
pidsMax, err := getCgroupPidsFile()
if err != nil {
return 0, err
}
f, err := os.Open(pidsMax) // #nosec - intended reading from /sys/...
if err != nil {
return 0, err
}
defer f.Close()
maxPidsStr, err := bufio.NewReader(f).ReadString('\n')
if err != nil && err != io.EOF {
return 0, err
}
maxPidsStr = strings.TrimRight(maxPidsStr, "\n")
maxPids := -1
if maxPidsStr != "max" {
maxPids, err = strconv.Atoi(maxPidsStr)
if err != nil {
return 0, err
}
}
return maxPids, nil
}
// SetPIDLimit configures the given PID limit for the current process. A value
// of -1 translates to "max".
func SetPIDLimit(limit int) error {
limitStr := "max"
if limit != -1 {
limitStr = fmt.Sprintf("%d", limit)
}
pidsMax, err := getCgroupPidsFile()
if err != nil {
return err
}
f, err := os.Create(pidsMax)
if err != nil {
return err
}
defer f.Close()
_, err = f.WriteString(limitStr)
if err != nil {
return err
}
return nil
}

View File

@ -1,52 +0,0 @@
/*
Copyright 2019 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"os"
"testing"
)
// minimal test to check if GetPIDLimit() returns an int
// changing the limit require root permissions, not tested
func TestGetPIDLimit(t *testing.T) {
runTest := os.Getenv("CEPH_CSI_RUN_ALL_TESTS")
if runTest == "" {
t.Skip("not running test that requires root permissions and cgroup support")
}
limit, err := GetPIDLimit()
if err != nil {
t.Errorf("no error should be returned, got: %v", err)
}
if limit == 0 {
t.Error("a PID limit of 0 is invalid")
}
// this is expected to fail when not run as root
err = SetPIDLimit(4096)
if err != nil {
t.Log("failed to set PID limit, are you running as root?")
} else {
// in case it worked, reset to the previous value
err = SetPIDLimit(limit)
if err != nil {
t.Logf("failed to reset PID to original limit %d", limit)
}
}
}

View File

@ -1,83 +0,0 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"strings"
)
const (
keyArg = "--key="
keyFileArg = "--keyfile="
secretArg = "secret="
optionsArgSeparator = ','
strippedKey = "--key=***stripped***"
strippedKeyFile = "--keyfile=***stripped***"
strippedSecret = "secret=***stripped***"
)
// StripSecretInArgs strips values of either "--key"/"--keyfile" or "secret=".
// `args` is left unchanged.
// Expects only one occurrence of either "--key"/"--keyfile" or "secret=".
func StripSecretInArgs(args []string) []string {
out := make([]string, len(args))
copy(out, args)
if !stripKey(out) {
stripSecret(out)
}
return out
}
func stripKey(out []string) bool {
for i := range out {
if strings.HasPrefix(out[i], keyArg) {
out[i] = strippedKey
return true
}
if strings.HasPrefix(out[i], keyFileArg) {
out[i] = strippedKeyFile
return true
}
}
return false
}
func stripSecret(out []string) bool {
for i := range out {
arg := out[i]
begin := strings.Index(arg, secretArg)
if begin == -1 {
continue
}
end := strings.IndexByte(arg[begin+len(secretArg):], optionsArgSeparator)
out[i] = arg[:begin] + strippedSecret
if end != -1 {
out[i] += arg[end+len(secretArg):]
}
return true
}
return false
}

View File

@ -1,256 +0,0 @@
/*
Copyright 2020 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
)
const (
keySeparator rune = '/'
labelSeparator string = ","
)
func k8sGetNodeLabels(nodeName string) (map[string]string, error) {
client := NewK8sClient()
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get node (%s) information : %v", nodeName, err)
}
return node.GetLabels(), nil
}
// GetTopologyFromDomainLabels returns the CSI topology map, determined from
// the domain labels and their values from the CO system
// Expects domainLabels in arg to be in the format "[prefix/]<name>,[prefix/]<name>,...",
func GetTopologyFromDomainLabels(domainLabels, nodeName, driverName string) (map[string]string, error) {
if domainLabels == "" {
return nil, nil
}
// size checks on domain label prefix
topologyPrefix := strings.ToLower("topology." + driverName)
if len(topologyPrefix) > 63 {
return nil, fmt.Errorf("computed topology label prefix (%s) for node exceeds length limits", topologyPrefix)
}
// driverName is validated, and we are adding a lowercase "topology." to it, so no validation for conformance
// Convert passed in labels to a map, and check for uniqueness
labelsToRead := strings.SplitN(domainLabels, labelSeparator, -1)
klog.Infof("passed in node labels for processing : %+v", labelsToRead)
labelsIn := make(map[string]bool)
labelCount := 0
for _, label := range labelsToRead {
// as we read the labels from k8s, and check for missing labels,
// no label conformance checks here
if _, ok := labelsIn[label]; ok {
return nil, fmt.Errorf("duplicate label (%s) found in domain labels", label)
}
labelsIn[label] = true
labelCount++
}
nodeLabels, err := k8sGetNodeLabels(nodeName)
if err != nil {
return nil, err
}
// Determine values for requested labels from node labels
domainMap := make(map[string]string)
found := 0
for key, value := range nodeLabels {
if _, ok := labelsIn[key]; !ok {
continue
}
// label found split name component and store value
nameIdx := strings.IndexRune(key, keySeparator)
domain := key[nameIdx+1:]
domainMap[domain] = value
labelsIn[key] = false
found++
}
// Ensure all labels are found
if found != labelCount {
missingLabels := []string{}
for key, missing := range labelsIn {
if missing {
missingLabels = append(missingLabels, key)
}
}
return nil, fmt.Errorf("missing domain labels %v on node (%s)", missingLabels, nodeName)
}
klog.Infof("list of domains processed : %+v", domainMap)
topology := make(map[string]string)
for domain, value := range domainMap {
topology[topologyPrefix+"/"+domain] = value
// TODO: when implementing domain takeover/giveback, enable a domain value that can remain pinned to the node
// topology["topology."+driverName+"/"+domain+"-pinned"] = value
}
return topology, nil
}
type topologySegment struct {
DomainLabel string `json:"domainLabel"`
DomainValue string `json:"value"`
}
// TopologyConstrainedPool stores the pool name and a list of its associated topology domain values
type TopologyConstrainedPool struct {
PoolName string `json:"poolName"`
DataPoolName string `json:"dataPool"`
DomainSegments []topologySegment `json:"domainSegments"`
}
// GetTopologyFromRequest extracts TopologyConstrainedPools and passed in accessibility constraints
// from a CSI CreateVolume request
func GetTopologyFromRequest(req *csi.CreateVolumeRequest) (*[]TopologyConstrainedPool, *csi.TopologyRequirement, error) {
var (
topologyPools []TopologyConstrainedPool
)
// check if parameters have pool configuration pertaining to topology
topologyPoolsStr := req.GetParameters()["topologyConstrainedPools"]
if topologyPoolsStr == "" {
return nil, nil, nil
}
// check if there are any accessibility requirements in the request
accessibilityRequirements := req.GetAccessibilityRequirements()
if accessibilityRequirements == nil {
return nil, nil, nil
}
// extract topology based pools configuration
err := json.Unmarshal([]byte(strings.Replace(topologyPoolsStr, "\n", " ", -1)), &topologyPools)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse JSON encoded topology constrained pools parameter (%s): %v", topologyPoolsStr, err)
}
return &topologyPools, accessibilityRequirements, nil
}
// MatchTopologyForPool returns the topology map, if the passed in pool matches any
// passed in accessibility constraints
func MatchTopologyForPool(topologyPools *[]TopologyConstrainedPool,
accessibilityRequirements *csi.TopologyRequirement, poolName string) (map[string]string, error) {
var topologyPool []TopologyConstrainedPool
if topologyPools == nil || accessibilityRequirements == nil {
return nil, nil
}
// find the pool in the list of topology based pools
for _, value := range *topologyPools {
if value.PoolName == poolName {
topologyPool = append(topologyPool, value)
break
}
}
if len(topologyPool) == 0 {
return nil, fmt.Errorf("none of the configured topology pools (%+v) matched passed in pool name (%s)",
topologyPools, poolName)
}
_, _, topology, err := FindPoolAndTopology(&topologyPool, accessibilityRequirements)
return topology, err
}
// FindPoolAndTopology loops through passed in "topologyPools" and also related
// accessibility requirements, to determine which pool matches the requirement.
// The return variables are, image poolname, data poolname, and topology map of
// matched requirement
func FindPoolAndTopology(topologyPools *[]TopologyConstrainedPool,
accessibilityRequirements *csi.TopologyRequirement) (string, string, map[string]string, error) {
if topologyPools == nil || accessibilityRequirements == nil {
return "", "", nil, nil
}
// select pool that fits first topology constraint preferred requirements
for _, topology := range accessibilityRequirements.GetPreferred() {
topologyPool := matchPoolToTopology(topologyPools, topology)
if topologyPool.PoolName != "" {
return topologyPool.PoolName, topologyPool.DataPoolName, topology.GetSegments(), nil
}
}
// If preferred mismatches, check requisite for a fit
for _, topology := range accessibilityRequirements.GetRequisite() {
topologyPool := matchPoolToTopology(topologyPools, topology)
if topologyPool.PoolName != "" {
return topologyPool.PoolName, topologyPool.DataPoolName, topology.GetSegments(), nil
}
}
return "", "", nil, fmt.Errorf("none of the topology constrained pools matched requested "+
"topology constraints : pools (%+v) requested topology (%+v)",
*topologyPools, *accessibilityRequirements)
}
// matchPoolToTopology loops through passed in pools, and for each pool checks if all
// requested topology segments are present and match the request, returning the first pool
// that hence matches (or an empty string if none match)
func matchPoolToTopology(topologyPools *[]TopologyConstrainedPool, topology *csi.Topology) TopologyConstrainedPool {
domainMap := extractDomainsFromlabels(topology)
// check if any pool matches all the domain keys and values
for _, topologyPool := range *topologyPools {
mismatch := false
// match all pool topology labels to requested topology
for _, segment := range topologyPool.DomainSegments {
if domainValue, ok := domainMap[segment.DomainLabel]; !ok || domainValue != segment.DomainValue {
mismatch = true
break
}
}
if mismatch {
continue
}
return topologyPool
}
return TopologyConstrainedPool{}
}
// extractDomainsFromlabels returns the domain name map, from passed in domain segments,
// which is of the form [prefix/]<name>
func extractDomainsFromlabels(topology *csi.Topology) map[string]string {
domainMap := make(map[string]string)
for domainKey, value := range topology.GetSegments() {
domainIdx := strings.IndexRune(domainKey, keySeparator)
domain := domainKey[domainIdx+1:]
domainMap[domain] = value
}
return domainMap
}

View File

@ -1,379 +0,0 @@
/*
Copyright 2020 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"testing"
"github.com/container-storage-interface/spec/lib/go/csi"
)
// nolint: gocyclo
// TestFindPoolAndTopology also tests MatchTopologyForPool
func TestFindPoolAndTopology(t *testing.T) {
var err error
var label1 = "region"
var label2 = "zone"
var l1Value1 = "R1"
var l1Value2 = "R2"
var l2Value1 = "Z1"
var l2Value2 = "Z2"
var pool1 = "PoolA"
var pool2 = "PoolB"
var topologyPrefix = "prefix"
var emptyTopoPools = []TopologyConstrainedPool{}
var emptyPoolNameTopoPools = []TopologyConstrainedPool{
{
DomainSegments: []topologySegment{
{
DomainLabel: label1,
DomainValue: l1Value1,
},
{
DomainLabel: label2,
DomainValue: l2Value1,
},
},
},
}
var emptyDomainsInTopoPools = []TopologyConstrainedPool{
{
PoolName: pool1,
},
}
var partialDomainsInTopoPools = []TopologyConstrainedPool{
{
PoolName: pool1,
DomainSegments: []topologySegment{
{
DomainLabel: label1,
DomainValue: l1Value1,
},
},
},
}
var differentDomainsInTopoPools = []TopologyConstrainedPool{
{
PoolName: pool1,
DomainSegments: []topologySegment{
{
DomainLabel: label1 + "fuzz1",
DomainValue: l1Value1,
},
{
DomainLabel: label2,
DomainValue: l2Value1,
},
},
},
{
PoolName: pool2,
DomainSegments: []topologySegment{
{
DomainLabel: label1,
DomainValue: l1Value2,
},
{
DomainLabel: label2,
DomainValue: l2Value2 + "fuzz1",
},
},
},
}
var validSingletonTopoPools = []TopologyConstrainedPool{
{
PoolName: pool1,
DomainSegments: []topologySegment{
{
DomainLabel: label1,
DomainValue: l1Value1,
},
{
DomainLabel: label2,
DomainValue: l2Value1,
},
},
},
}
var validMultipleTopoPools = []TopologyConstrainedPool{
{
PoolName: pool1,
DomainSegments: []topologySegment{
{
DomainLabel: label1,
DomainValue: l1Value1,
},
{
DomainLabel: label2,
DomainValue: l2Value1,
},
},
},
{
PoolName: pool2,
DomainSegments: []topologySegment{
{
DomainLabel: label1,
DomainValue: l1Value2,
},
{
DomainLabel: label2,
DomainValue: l2Value2,
},
},
},
}
var emptyAccReq = csi.TopologyRequirement{}
var emptySegmentAccReq = csi.TopologyRequirement{
Requisite: []*csi.Topology{
{},
{},
},
}
var partialHigherSegmentAccReq = csi.TopologyRequirement{
Preferred: []*csi.Topology{
{
Segments: map[string]string{
topologyPrefix + "/" + label1: l1Value1,
},
},
},
}
var partialLowerSegmentAccReq = csi.TopologyRequirement{
Preferred: []*csi.Topology{
{
Segments: map[string]string{
topologyPrefix + "/" + label2: l2Value1,
},
},
},
}
var differentSegmentAccReq = csi.TopologyRequirement{
Requisite: []*csi.Topology{
{
Segments: map[string]string{
topologyPrefix + "/" + label1 + "fuzz2": l1Value1,
topologyPrefix + "/" + label2: l2Value1,
},
},
{
Segments: map[string]string{
topologyPrefix + "/" + label1: l1Value2,
topologyPrefix + "/" + label2: l2Value2 + "fuzz2",
},
},
},
}
var validAccReq = csi.TopologyRequirement{
Requisite: []*csi.Topology{
{
Segments: map[string]string{
topologyPrefix + "/" + label1: l1Value1,
topologyPrefix + "/" + label2: l2Value1,
},
},
{
Segments: map[string]string{
topologyPrefix + "/" + label1: l1Value2,
topologyPrefix + "/" + label2: l2Value2,
},
},
},
Preferred: []*csi.Topology{
{
Segments: map[string]string{
topologyPrefix + "/" + label1: l1Value1,
topologyPrefix + "/" + label2: l2Value1,
},
},
{
Segments: map[string]string{
topologyPrefix + "/" + label1: l1Value2,
topologyPrefix + "/" + label2: l2Value2,
},
},
},
}
checkOutput := func(err error, poolName string, topoSegment map[string]string) error {
if err != nil {
return fmt.Errorf("expected success, got err (%v)", err)
}
if poolName != pool1 || !(len(topoSegment) == 2) &&
topoSegment[topologyPrefix+"/"+label1] == l1Value1 &&
topoSegment[topologyPrefix+"/"+label2] == l2Value1 {
return fmt.Errorf("expected poolName (%s) and topoSegment (%s %s), got (%s) and (%v)", pool1,
topologyPrefix+"/"+label1+l1Value1, topologyPrefix+"/"+label2+l2Value1,
poolName, topoSegment)
}
return nil
}
// Test nil values
_, _, _, err = FindPoolAndTopology(nil, nil)
if err != nil {
t.Errorf("expected success due to nil in-args (%v)", err)
}
poolName, _, _, err := FindPoolAndTopology(&validMultipleTopoPools, nil)
if err != nil || poolName != "" {
t.Errorf("expected success due to nil accessibility requirements (err - %v) (poolName - %s)", err, poolName)
}
poolName, _, _, err = FindPoolAndTopology(nil, &validAccReq)
if err != nil || poolName != "" {
t.Errorf("expected success due to nil topology pools (err - %v) (poolName - %s)", err, poolName)
}
// Test valid accessibility requirement, with invalid topology pools values
_, _, _, err = FindPoolAndTopology(&emptyTopoPools, &validAccReq)
if err == nil {
t.Errorf("expected failure due to empty topology pools")
}
_, _, _, err = FindPoolAndTopology(&emptyPoolNameTopoPools, &validAccReq)
if err == nil {
t.Errorf("expected failure due to missing pool name in topology pools")
}
_, _, _, err = FindPoolAndTopology(&differentDomainsInTopoPools, &validAccReq)
if err == nil {
t.Errorf("expected failure due to mismatching domains in topology pools")
}
// Test valid topology pools, with invalid accessibility requirements
_, _, _, err = FindPoolAndTopology(&validMultipleTopoPools, &emptyAccReq)
if err == nil {
t.Errorf("expected failure due to empty accessibility requirements")
}
_, _, _, err = FindPoolAndTopology(&validSingletonTopoPools, &emptySegmentAccReq)
if err == nil {
t.Errorf("expected failure due to empty segments in accessibility requirements")
}
_, _, _, err = FindPoolAndTopology(&validMultipleTopoPools, &partialHigherSegmentAccReq)
if err == nil {
t.Errorf("expected failure due to partial segments in accessibility requirements")
}
_, _, _, err = FindPoolAndTopology(&validSingletonTopoPools, &partialLowerSegmentAccReq)
if err == nil {
t.Errorf("expected failure due to partial segments in accessibility requirements")
}
_, _, _, err = FindPoolAndTopology(&validMultipleTopoPools, &partialLowerSegmentAccReq)
if err == nil {
t.Errorf("expected failure due to partial segments in accessibility requirements")
}
_, _, _, err = FindPoolAndTopology(&validMultipleTopoPools, &differentSegmentAccReq)
if err == nil {
t.Errorf("expected failure due to mismatching segments in accessibility requirements")
}
// Test success cases
// If a pool is a superset of domains (either empty domain labels or partial), it can be selected
poolName, _, topoSegment, err := FindPoolAndTopology(&emptyDomainsInTopoPools, &validAccReq)
err = checkOutput(err, poolName, topoSegment)
if err != nil {
t.Errorf("expected success got: (%v)", err)
}
poolName, _, topoSegment, err = FindPoolAndTopology(&partialDomainsInTopoPools, &validAccReq)
err = checkOutput(err, poolName, topoSegment)
if err != nil {
t.Errorf("expected success got: (%v)", err)
}
// match in a singleton topology pools
poolName, _, topoSegment, err = FindPoolAndTopology(&validSingletonTopoPools, &validAccReq)
err = checkOutput(err, poolName, topoSegment)
if err != nil {
t.Errorf("expected success got: (%v)", err)
}
// match first in multiple topology pools
poolName, _, topoSegment, err = FindPoolAndTopology(&validMultipleTopoPools, &validAccReq)
err = checkOutput(err, poolName, topoSegment)
if err != nil {
t.Errorf("expected success got: (%v)", err)
}
// match non-first in multiple topology pools
switchPoolOrder := []TopologyConstrainedPool{}
switchPoolOrder = append(switchPoolOrder, validMultipleTopoPools[1], validMultipleTopoPools[0])
poolName, _, topoSegment, err = FindPoolAndTopology(&switchPoolOrder, &validAccReq)
err = checkOutput(err, poolName, topoSegment)
if err != nil {
t.Errorf("expected success got: (%v)", err)
}
// test valid dataPool return
for i := range switchPoolOrder {
switchPoolOrder[i].DataPoolName = "ec-" + switchPoolOrder[i].PoolName
}
poolName, dataPoolName, topoSegment, err := FindPoolAndTopology(&switchPoolOrder, &validAccReq)
err = checkOutput(err, poolName, topoSegment)
if err != nil {
t.Errorf("expected success got: (%v)", err)
}
if dataPoolName != "ec-"+poolName {
t.Errorf("expected data pool to be named ec-%s, got %s", poolName, dataPoolName)
}
// TEST: MatchTopologyForPool
// check for non-existent pool
_, err = MatchTopologyForPool(&validMultipleTopoPools, &validAccReq, pool1+"fuzz")
if err == nil {
t.Errorf("expected failure due to non-existent pool name (%s) got success", pool1+"fuzz")
}
// check for existing pool
topoSegment, err = MatchTopologyForPool(&validMultipleTopoPools, &validAccReq, pool1)
err = checkOutput(err, pool1, topoSegment)
if err != nil {
t.Errorf("expected success got: (%v)", err)
}
}
/*
// TODO: To test GetTopologyFromDomainLabels we need it to accept a k8s client interface, to mock k8sGetNdeLabels output
func TestGetTopologyFromDomainLabels(t *testing.T) {
fakeNodes := v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "worker1",
Labels: map[string]string{
"prefix/region": "R1",
"prefix/zone": "Z1",
},
},
}
client := fake.NewSimpleClientset(&fakeNodes)
_, err := k8sGetNodeLabels(client, "nodeName")
if err == nil {
t.Error("Expected error due to invalid node name, got success")
}
labels, err := k8sGetNodeLabels(client, "worker1")
if err != nil {
t.Errorf("Expected success, got err (%v)", err)
}
t.Errorf("Read labels (%v)", labels)
}*/

View File

@ -1,195 +0,0 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"math"
"os"
"path"
"strings"
"time"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/cloud-provider/volume/helpers"
"k8s.io/klog"
"k8s.io/utils/mount"
)
// RoundOffVolSize rounds up given quantity upto chunks of MiB/GiB
func RoundOffVolSize(size int64) int64 {
size = RoundOffBytes(size)
// convert size back to MiB for rbd CLI
return size / helpers.MiB
}
// RoundOffBytes converts roundoff the size
// 1.1Mib will be round off to 2Mib same for GiB
// size less than 1MiB will be round off to 1MiB
func RoundOffBytes(bytes int64) int64 {
var num int64
floatBytes := float64(bytes)
// round off the value if its in decimal
if floatBytes < helpers.GiB {
num = int64(math.Ceil(floatBytes / helpers.MiB))
num *= helpers.MiB
} else {
num = int64(math.Ceil(floatBytes / helpers.GiB))
num *= helpers.GiB
}
return num
}
// variables which will be set during the build time
var (
// GitCommit tell the latest git commit image is built from
GitCommit string
// DriverVersion which will be driver version
DriverVersion string
)
// Config holds the parameters list which can be configured
type Config struct {
Vtype string // driver type [rbd|cephfs|liveness]
Endpoint string // CSI endpoint
DriverName string // name of the driver
NodeID string // node id
InstanceID string // unique ID distinguishing this instance of Ceph CSI
MetadataStorage string // metadata persistence method [node|k8s_configmap]
PluginPath string // location of cephcsi plugin
DomainLabels string // list of domain labels to read from the node
// cephfs related flags
MountCacheDir string // mount info cache save dir
// metrics related flags
MetricsPath string // path of prometheus endpoint where metrics will be available
HistogramOption string // Histogram option for grpc metrics, should be comma separated value, ex:= "0.5,2,6" where start=0.5 factor=2, count=6
MetricsIP string // TCP port for liveness/ metrics requests
PidLimit int // PID limit to configure through cgroups")
MetricsPort int // TCP port for liveness/grpc metrics requests
PollTime time.Duration // time interval in seconds between each poll
PoolTimeout time.Duration // probe timeout in seconds
EnableGRPCMetrics bool // option to enable grpc metrics
IsControllerServer bool // if set to true start provisoner server
IsNodeServer bool // if set to true start node server
Version bool // cephcsi version
// cephfs related flags
ForceKernelCephFS bool // force to use the ceph kernel client even if the kernel is < 4.17
}
// CreatePersistanceStorage creates storage path and initializes new cache
func CreatePersistanceStorage(sPath, metaDataStore, pluginPath string) (CachePersister, error) {
var err error
if err = CreateMountPoint(path.Join(sPath, "controller")); err != nil {
klog.Errorf("failed to create persistent storage for controller: %v", err)
return nil, err
}
if err = CreateMountPoint(path.Join(sPath, "node")); err != nil {
klog.Errorf("failed to create persistent storage for node: %v", err)
return nil, err
}
cp, err := NewCachePersister(metaDataStore, pluginPath)
if err != nil {
klog.Errorf("failed to define cache persistence method: %v", err)
return nil, err
}
return cp, err
}
// ValidateDriverName validates the driver name
func ValidateDriverName(driverName string) error {
if driverName == "" {
return errors.New("driver name is empty")
}
if len(driverName) > 63 {
return errors.New("driver name length should be less than 63 chars")
}
var err error
for _, msg := range validation.IsDNS1123Subdomain(strings.ToLower(driverName)) {
if err == nil {
err = errors.New(msg)
continue
}
err = errors.Wrap(err, msg)
}
return err
}
// GenerateVolID generates a volume ID based on passed in parameters and version, to be returned
// to the CO system
func GenerateVolID(ctx context.Context, monitors string, cr *Credentials, locationID int64, pool, clusterID, objUUID string, volIDVersion uint16) (string, error) {
var err error
if locationID == InvalidPoolID {
locationID, err = GetPoolID(ctx, monitors, cr, pool)
if err != nil {
return "", err
}
}
// generate the volume ID to return to the CO system
vi := CSIIdentifier{
LocationID: locationID,
EncodingVersion: volIDVersion,
ClusterID: clusterID,
ObjectUUID: objUUID,
}
volID, err := vi.ComposeCSIID()
return volID, err
}
// CreateMountPoint creates the directory with given path
func CreateMountPoint(mountPath string) error {
return os.MkdirAll(mountPath, 0750)
}
// checkDirExists checks directory exists or not
func checkDirExists(p string) bool {
if _, err := os.Stat(p); os.IsNotExist(err) {
return false
}
return true
}
// IsMountPoint checks if the given path is mountpoint or not
func IsMountPoint(p string) (bool, error) {
dummyMount := mount.New("")
notMnt, err := dummyMount.IsLikelyNotMountPoint(p)
if err != nil {
return false, status.Error(codes.Internal, err.Error())
}
return !notMnt, nil
}
// Mount mounts the source to target path
func Mount(source, target, fstype string, options []string) error {
dummyMount := mount.New("")
return dummyMount.Mount(source, target, fstype, options)
}

View File

@ -1,144 +0,0 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
)
func TestRoundOffBytes(t *testing.T) {
type args struct {
bytes int64
}
tests := []struct {
name string
args args
want int64
}{
{
"1MiB conversions",
args{
bytes: 1048576,
},
1048576,
},
{
"1000kiB conversion",
args{
bytes: 1000,
},
1048576, // equal to 1MiB
},
{
"1.5Mib conversion",
args{
bytes: 1572864,
},
2097152, // equal to 2MiB
},
{
"1.1MiB conversion",
args{
bytes: 1153434,
},
2097152, // equal to 2MiB
},
{
"1.5GiB conversion",
args{
bytes: 1610612736,
},
2147483648, // equal to 2GiB
},
{
"1.1GiB conversion",
args{
bytes: 1181116007,
},
2147483648, // equal to 2GiB
},
}
for _, tt := range tests {
ts := tt
t.Run(ts.name, func(t *testing.T) {
if got := RoundOffBytes(ts.args.bytes); got != ts.want {
t.Errorf("RoundOffBytes() = %v, want %v", got, ts.want)
}
})
}
}
func TestRoundOffVolSize(t *testing.T) {
type args struct {
size int64
}
tests := []struct {
name string
args args
want int64
}{
{
"1MiB conversions",
args{
size: 1048576,
},
1, // MiB
},
{
"1000kiB conversion",
args{
size: 1000,
},
1, // MiB
},
{
"1.5Mib conversion",
args{
size: 1572864,
},
2, // MiB
},
{
"1.1MiB conversion",
args{
size: 1153434,
},
2, // MiB
},
{
"1.5GiB conversion",
args{
size: 1610612736,
},
2048, // MiB
},
{
"1.1GiB conversion",
args{
size: 1181116007,
},
2048, // MiB
},
}
for _, tt := range tests {
ts := tt
t.Run(ts.name, func(t *testing.T) {
if got := RoundOffVolSize(ts.args.size); got != ts.want {
t.Errorf("RoundOffVolSize() = %v, want %v", got, ts.want)
}
})
}
}

View File

@ -1,80 +0,0 @@
package util
import (
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// ValidateNodeStageVolumeRequest validates the node stage request
func ValidateNodeStageVolumeRequest(req *csi.NodeStageVolumeRequest) error {
if req.GetVolumeCapability() == nil {
return status.Error(codes.InvalidArgument, "volume capability missing in request")
}
if req.GetVolumeId() == "" {
return status.Error(codes.InvalidArgument, "volume ID missing in request")
}
if req.GetStagingTargetPath() == "" {
return status.Error(codes.InvalidArgument, "staging target path missing in request")
}
if req.GetSecrets() == nil || len(req.GetSecrets()) == 0 {
return status.Error(codes.InvalidArgument, "stage secrets cannot be nil or empty")
}
// validate stagingpath exists
ok := checkDirExists(req.GetStagingTargetPath())
if !ok {
return status.Error(codes.InvalidArgument, "staging path does not exists on node")
}
return nil
}
// ValidateNodeUnstageVolumeRequest validates the node unstage request
func ValidateNodeUnstageVolumeRequest(req *csi.NodeUnstageVolumeRequest) error {
if req.GetVolumeId() == "" {
return status.Error(codes.InvalidArgument, "volume ID missing in request")
}
if req.GetStagingTargetPath() == "" {
return status.Error(codes.InvalidArgument, "staging target path missing in request")
}
return nil
}
// ValidateNodePublishVolumeRequest validates the node publish request
func ValidateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error {
if req.GetVolumeCapability() == nil {
return status.Error(codes.InvalidArgument, "volume capability missing in request")
}
if req.GetVolumeId() == "" {
return status.Error(codes.InvalidArgument, "volume ID missing in request")
}
if req.GetTargetPath() == "" {
return status.Error(codes.InvalidArgument, "target path missing in request")
}
if req.GetStagingTargetPath() == "" {
return status.Error(codes.InvalidArgument, "staging target path missing in request")
}
return nil
}
// ValidateNodeUnpublishVolumeRequest validates the node unpublish request
func ValidateNodeUnpublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) error {
if req.GetVolumeId() == "" {
return status.Error(codes.InvalidArgument, "volume ID missing in request")
}
if req.GetTargetPath() == "" {
return status.Error(codes.InvalidArgument, "target path missing in request")
}
return nil
}

View File

@ -1,337 +0,0 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"
)
const (
// path to service account token that will be used to authenticate with Vault
// #nosec
serviceAccountTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
// vault configuration defaults
vaultDefaultAuthPath = "/v1/auth/kubernetes/login"
vaultDefaultRole = "csi-kubernetes"
vaultDefaultNamespace = ""
vaultDefaultPassphraseRoot = "/v1/secret"
vaultDefaultPassphrasePath = ""
// vault request headers
vaultTokenHeader = "X-Vault-Token" // nolint: gosec, #nosec
vaultNamespaceHeader = "X-Vault-Namespace"
)
/*
VaultKMS represents a Hashicorp Vault KMS configuration
Example JSON structure in the KMS config is,
{
"local_vault_unique_identifier": {
"encryptionKMSType": "vault",
"vaultAddress": "https://127.0.0.1:8500",
"vaultAuthPath": "/v1/auth/kubernetes/login",
"vaultRole": "csi-kubernetes",
"vaultNamespace": "",
"vaultPassphraseRoot": "/v1/secret",
"vaultPassphrasePath": "",
"vaultCAVerify": true,
"vaultCAFromSecret": "vault-ca"
},
...
}
*/
type VaultKMS struct {
EncryptionKMSID string
VaultAddress string
VaultAuthPath string
VaultRole string
VaultNamespace string
VaultPassphraseRoot string
VaultPassphrasePath string
VaultCAVerify bool
vaultCA *x509.CertPool
}
// InitVaultKMS returns an interface to HashiCorp Vault KMS
func InitVaultKMS(kmsID string, config, secrets map[string]string) (EncryptionKMS, error) {
var (
ok bool
err error
)
kms := &VaultKMS{}
kms.EncryptionKMSID = kmsID
kms.VaultAddress, ok = config["vaultAddress"]
if !ok || kms.VaultAddress == "" {
return nil, fmt.Errorf("missing 'vaultAddress' for vault KMS %s", kmsID)
}
kms.VaultAuthPath, ok = config["vaultAuthPath"]
if !ok || kms.VaultAuthPath == "" {
kms.VaultAuthPath = vaultDefaultAuthPath
}
kms.VaultRole, ok = config["vaultRole"]
if !ok || kms.VaultRole == "" {
kms.VaultRole = vaultDefaultRole
}
kms.VaultNamespace, ok = config["vaultNamespace"]
if !ok || kms.VaultNamespace == "" {
kms.VaultNamespace = vaultDefaultNamespace
}
kms.VaultPassphraseRoot, ok = config["vaultPassphraseRoot"]
if !ok || kms.VaultPassphraseRoot == "" {
kms.VaultPassphraseRoot = vaultDefaultPassphraseRoot
}
kms.VaultPassphrasePath, ok = config["vaultPassphrasePath"]
if !ok || kms.VaultPassphrasePath == "" {
kms.VaultPassphrasePath = vaultDefaultPassphrasePath
}
kms.VaultCAVerify = true
verifyCA, ok := config["vaultCAVerify"]
if ok {
kms.VaultCAVerify, err = strconv.ParseBool(verifyCA)
if err != nil {
return nil, fmt.Errorf("failed to parse 'vaultCAVerify' for vault <%s> kms config: %s",
kmsID, err)
}
}
vaultCAFromSecret, ok := config["vaultCAFromSecret"]
if ok && vaultCAFromSecret != "" {
caPEM, ok := secrets[vaultCAFromSecret]
if !ok {
return nil, fmt.Errorf("missing vault CA in secret %s", vaultCAFromSecret)
}
roots := x509.NewCertPool()
ok = roots.AppendCertsFromPEM([]byte(caPEM))
if !ok {
return nil, fmt.Errorf("failed loading CA bundle for vault from secret %s",
vaultCAFromSecret)
}
kms.vaultCA = roots
}
return kms, nil
}
// GetID is returning correlation ID to KMS configuration
func (kms *VaultKMS) GetID() string {
return kms.EncryptionKMSID
}
// GetPassphrase returns passphrase from Vault
func (kms *VaultKMS) GetPassphrase(key string) (string, error) {
var passphrase string
resp, err := kms.request("GET", kms.getKeyDataURI(key), nil)
if err != nil {
return "", fmt.Errorf("failed to retrieve passphrase for %s from vault: %s",
key, err)
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return "", MissingPassphrase{fmt.Errorf("passphrase for %s not found", key)}
}
err = kms.processError(resp, fmt.Sprintf("get passphrase for %s", key))
if err != nil {
return "", err
}
// parse resp as JSON and retrieve vault token
var result map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&result)
if err != nil {
return "", fmt.Errorf("failed parsing passphrase for %s from response: %s",
key, err)
}
data, ok := result["data"].(map[string]interface{})
if !ok {
return "", fmt.Errorf("failed parsing data for get passphrase request for %s", key)
}
data, ok = data["data"].(map[string]interface{})
if !ok {
return "", fmt.Errorf("failed parsing data.data for get passphrase request for %s", key)
}
passphrase, ok = data["passphrase"].(string)
if !ok {
return "", fmt.Errorf("failed parsing passphrase for get passphrase request for %s", key)
}
return passphrase, nil
}
// SavePassphrase saves new passphrase in Vault
func (kms *VaultKMS) SavePassphrase(key, value string) error {
data, err := json.Marshal(map[string]map[string]string{
"data": {
"passphrase": value,
},
})
if err != nil {
return fmt.Errorf("passphrase request data is broken: %s", err)
}
resp, err := kms.request("POST", kms.getKeyDataURI(key), data)
if err != nil {
return fmt.Errorf("failed to POST passphrase for %s to vault: %s", key, err)
}
defer resp.Body.Close()
err = kms.processError(resp, "save passphrase")
if err != nil {
return err
}
return nil
}
// DeletePassphrase deletes passphrase from Vault
func (kms *VaultKMS) DeletePassphrase(key string) error {
vaultToken, err := kms.getAccessToken()
if err != nil {
return fmt.Errorf("could not retrieve vault token to delete the passphrase at %s: %s",
key, err)
}
resp, err := kms.send("DELETE", kms.getKeyMetadataURI(key), &vaultToken, nil)
if err != nil {
return fmt.Errorf("delete passphrase at %s request to vault failed: %s", key, err)
}
defer resp.Body.Close()
if resp.StatusCode != 404 {
err = kms.processError(resp, "delete passphrase")
if err != nil {
return err
}
}
return nil
}
func (kms *VaultKMS) getKeyDataURI(key string) string {
return kms.VaultPassphraseRoot + "/data/" + kms.VaultPassphrasePath + key
}
func (kms *VaultKMS) getKeyMetadataURI(key string) string {
return kms.VaultPassphraseRoot + "/metadata/" + kms.VaultPassphrasePath + key
}
/*
getVaultAccessToken retrieves vault token using kubernetes authentication:
1. read jwt service account token from well known location
2. request token from vault using service account jwt token
Vault will verify service account jwt token with Kubernetes and return token
if the requester is allowed
*/
func (kms *VaultKMS) getAccessToken() (string, error) {
saToken, err := ioutil.ReadFile(serviceAccountTokenPath)
if err != nil {
return "", fmt.Errorf("service account token could not be read: %s", err)
}
data, err := json.Marshal(map[string]string{
"role": kms.VaultRole,
"jwt": string(saToken),
})
if err != nil {
return "", fmt.Errorf("vault token request data is broken: %s", err)
}
resp, err := kms.send("POST", kms.VaultAuthPath, nil, data)
if err != nil {
return "", fmt.Errorf("failed to retrieve vault token: %s", err)
}
defer resp.Body.Close()
err = kms.processError(resp, "retrieve vault token")
if err != nil {
return "", err
}
// parse resp as JSON and retrieve vault token
var result map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&result)
if err != nil {
return "", fmt.Errorf("failed parsing vaultToken from response: %s", err)
}
auth, ok := result["auth"].(map[string]interface{})
if !ok {
return "", fmt.Errorf("failed parsing vault token auth data")
}
vaultToken, ok := auth["client_token"].(string)
if !ok {
return "", fmt.Errorf("failed parsing vault client_token")
}
return vaultToken, nil
}
func (kms *VaultKMS) processError(resp *http.Response, action string) error {
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to %s (%v), error body parsing failed: %s",
action, resp.StatusCode, err)
}
return fmt.Errorf("failed to %s (%v): %s", action, resp.StatusCode, body)
}
func (kms *VaultKMS) request(method, path string, data []byte) (*http.Response, error) {
vaultToken, err := kms.getAccessToken()
if err != nil {
return nil, err
}
return kms.send(method, path, &vaultToken, data)
}
func (kms *VaultKMS) send(method, path string, token *string, data []byte) (*http.Response, error) {
tlsConfig := &tls.Config{}
if !kms.VaultCAVerify {
tlsConfig.InsecureSkipVerify = true
}
if kms.vaultCA != nil {
tlsConfig.RootCAs = kms.vaultCA
}
netTransport := &http.Transport{TLSClientConfig: tlsConfig}
client := &http.Client{Transport: netTransport}
var dataToSend io.Reader
if data != nil {
dataToSend = strings.NewReader(string(data))
}
req, err := http.NewRequest(method, kms.VaultAddress+path, dataToSend)
if err != nil {
return nil, fmt.Errorf("could not create a Vault request: %s", err)
}
if kms.VaultNamespace != "" {
req.Header.Set(vaultNamespaceHeader, kms.VaultNamespace)
}
if token != nil {
req.Header.Set(vaultTokenHeader, *token)
}
return client.Do(req)
}

View File

@ -1,151 +0,0 @@
/*
Copyright 2019 Ceph-CSI authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"encoding/binary"
"encoding/hex"
"errors"
"strings"
)
/*
CSIIdentifier contains the elements that form a CSI ID to be returned by the CSI plugin, and
contains enough information to decompose and extract required cluster and pool information to locate
the volume that relates to the CSI ID.
The CSI identifier is composed as elaborated in the comment against ComposeCSIID and thus,
DecomposeCSIID is the inverse of the same function.
The CSIIdentifier structure carries the following fields,
- LocationID: 64 bit integer identifier determining the location of the volume on the Ceph cluster.
It is the ID of the poolname or fsname, for RBD or CephFS backed volumes respectively.
- EncodingVersion: Carries the version number of the encoding scheme used to encode the CSI ID,
and is preserved for any future proofing w.r.t changes in the encoding scheme, and to retain
ability to parse backward compatible encodings.
- ClusterID: Is a unique ID per cluster that the CSI instance is serving and is restricted to
lengths that can be accommodated in the encoding scheme.
- ObjectUUID: Is the on-disk uuid of the object (image/snapshot) name, for the CSI volume that
corresponds to this CSI ID.
*/
type CSIIdentifier struct {
LocationID int64
EncodingVersion uint16
ClusterID string
ObjectUUID string
}
// This maximum comes from the CSI spec on max bytes allowed in the various CSI ID fields
const maxVolIDLen = 128
/*
ComposeCSIID composes a CSI ID from passed in parameters.
Version 1 of the encoding scheme is as follows,
[csi_id_version=1:4byte] + [-:1byte]
[length of clusterID=1:4byte] + [-:1byte]
[clusterID:36bytes (MAX)] + [-:1byte]
[poolID:16bytes] + [-:1byte]
[ObjectUUID:36bytes]
Total of constant field lengths, including '-' field separators would hence be,
4+1+4+1+1+16+1+36 = 64
*/
const (
knownFieldSize = 64
uuidSize = 36
)
func (ci CSIIdentifier) ComposeCSIID() (string, error) {
buf16 := make([]byte, 2)
buf64 := make([]byte, 8)
if (knownFieldSize + len(ci.ClusterID)) > maxVolIDLen {
return "", errors.New("CSI ID encoding length overflow")
}
if len(ci.ObjectUUID) != uuidSize {
return "", errors.New("CSI ID invalid object uuid")
}
binary.BigEndian.PutUint16(buf16, ci.EncodingVersion)
versionEncodedHex := hex.EncodeToString(buf16)
binary.BigEndian.PutUint16(buf16, uint16(len(ci.ClusterID)))
clusterIDLength := hex.EncodeToString(buf16)
binary.BigEndian.PutUint64(buf64, uint64(ci.LocationID))
poolIDEncodedHex := hex.EncodeToString(buf64)
return strings.Join([]string{versionEncodedHex, clusterIDLength, ci.ClusterID,
poolIDEncodedHex, ci.ObjectUUID}, "-"), nil
}
/*
DecomposeCSIID composes a CSIIdentifier from passed in string
*/
func (ci *CSIIdentifier) DecomposeCSIID(composedCSIID string) (err error) {
bytesToProcess := uint16(len(composedCSIID))
// if length is less that expected constant elements, then bail out!
if bytesToProcess < knownFieldSize {
return errors.New("failed to decode CSI identifier, string underflow")
}
buf16, err := hex.DecodeString(composedCSIID[0:4])
if err != nil {
return err
}
ci.EncodingVersion = binary.BigEndian.Uint16(buf16)
// 4 for version encoding and 1 for '-' separator
bytesToProcess -= 5
buf16, err = hex.DecodeString(composedCSIID[5:9])
if err != nil {
return err
}
clusterIDLength := binary.BigEndian.Uint16(buf16)
// 4 for length encoding and 1 for '-' separator
bytesToProcess -= 5
if bytesToProcess < (clusterIDLength + 1) {
return errors.New("failed to decode CSI identifier, string underflow")
}
ci.ClusterID = composedCSIID[10 : 10+clusterIDLength]
// additional 1 for '-' separator
bytesToProcess -= (clusterIDLength + 1)
nextFieldStartIdx := 10 + clusterIDLength + 1
if bytesToProcess < 17 {
return errors.New("failed to decode CSI identifier, string underflow")
}
buf64, err := hex.DecodeString(composedCSIID[nextFieldStartIdx : nextFieldStartIdx+16])
if err != nil {
return err
}
ci.LocationID = int64(binary.BigEndian.Uint64(buf64))
// 16 for poolID encoding and 1 for '-' separator
bytesToProcess -= 17
nextFieldStartIdx += 17
// has to be an exact match
if bytesToProcess != uuidSize {
return errors.New("failed to decode CSI identifier, string size mismatch")
}
ci.ObjectUUID = composedCSIID[nextFieldStartIdx : nextFieldStartIdx+uuidSize]
return err
}

View File

@ -1,95 +0,0 @@
/*
Copyright 2019 Ceph-CSI authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
)
type testTuple struct {
vID CSIIdentifier
composedVolID string
wantEnc bool
wantEncError bool
wantDec bool
wantDecError bool
}
// TODO: Add more test tuples to test out other edge conditions
var testData = []testTuple{
{
vID: CSIIdentifier{
LocationID: 0xffff,
EncodingVersion: 0xffff,
ClusterID: "01616094-9d93-4178-bf45-c7eac19e8b15",
ObjectUUID: "00000000-1111-2222-bbbb-cacacacacaca",
},
composedVolID: "ffff-0024-01616094-9d93-4178-bf45-c7eac19e8b15-000000000000ffff-00000000-1111-2222-bbbb-cacacacacaca",
wantEnc: true,
wantEncError: false,
wantDec: true,
wantDecError: false,
},
}
func TestComposeDecomposeID(t *testing.T) {
var (
err error
viDecompose CSIIdentifier
composedVolID string
)
for _, test := range testData {
if test.wantEnc {
composedVolID, err = test.vID.ComposeCSIID()
if err != nil && !test.wantEncError {
t.Errorf("Composing failed: want (%#v), got (%#v %#v)",
test, composedVolID, err)
}
if err == nil && test.wantEncError {
t.Errorf("Composing failed: want (%#v), got (%#v %#v)",
test, composedVolID, err)
}
if !test.wantEncError && err == nil && composedVolID != test.composedVolID {
t.Errorf("Composing failed: want (%#v), got (%#v %#v)",
test, composedVolID, err)
}
}
if test.wantDec {
err = viDecompose.DecomposeCSIID(test.composedVolID)
if err != nil && !test.wantDecError {
t.Errorf("Decomposing failed: want (%#v), got (%#v %#v)",
test, viDecompose, err)
}
if err == nil && test.wantDecError {
t.Errorf("Decomposing failed: want (%#v), got (%#v %#v)",
test, viDecompose, err)
}
if !test.wantDecError && err == nil && viDecompose != test.vID {
t.Errorf("Decomposing failed: want (%#v), got (%#v %#v)",
test, viDecompose, err)
}
}
}
}

View File

@ -1,627 +0,0 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"strings"
"github.com/pborman/uuid"
"github.com/pkg/errors"
"k8s.io/klog"
)
// Length of string representation of uuid, xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx is 36 bytes
const uuidEncodedLength = 36
/*
RADOS omaps usage:
This note details how we preserve idempotent nature of create requests and retain the relationship
between orchestrator (CO) generated names and plugin generated names for volumes and snapshots.
NOTE: volume denotes an rbd image or a CephFS subvolume
The implementation uses Ceph RADOS omaps to preserve the relationship between request name and
generated volume (or snapshot) name. There are 4 types of omaps in use,
- A "csi.volumes.[csi-id]" (or "csi.volumes"+.+CSIInstanceID), (referred to using csiDirectory variable)
- stores keys named using the CO generated names for volume requests (prefixed with csiNameKeyPrefix)
- keys are named "csi.volume."+[CO generated VolName]
- Key value contains the volume uuid that is created, for the CO provided name
- A "csi.snaps.[csi-id]" (or "csi.snaps"+.+CSIInstanceID), (referred to using csiDirectory variable)
- stores keys named using the CO generated names for snapshot requests (prefixed with csiNameKeyPrefix)
- keys are named "csi.snap."+[CO generated SnapName]
- Key value contains the snapshot uuid that is created, for the CO provided name
- A per volume omap named "csi.volume."+[volume uuid], (referred to as CephUUIDDirectory)
- stores the key named "csi.volname", that has the value of the CO generated VolName that
this volume refers to (referred to using csiNameKey value)
- stores the key named "csi.imagename", that has the value of the Ceph RBD image name
this volume refers to (referred to using csiImageKey value)
- A per snapshot omap named "rbd.csi.snap."+[RBD snapshot uuid], (referred to as CephUUIDDirectory)
- stores a key named "csi.snapname", that has the value of the CO generated SnapName that this
snapshot refers to (referred to using csiNameKey value)
- stores the key named "csi.imagename", that has the value of the Ceph RBD image name
this snapshot refers to (referred to using csiImageKey value)
- stores a key named "csi.source", that has the value of the volume name that is the
source of the snapshot (referred to using cephSnapSourceKey value)
Creation of omaps:
When a volume create request is received (or a snapshot create, the snapshot is not detailed in this
comment further as the process is similar),
- The csiDirectory is consulted to find if there is already a key with the CO VolName, and if present,
it is used to read its references to reach the UUID that backs this VolName, to check if the
UUID based volume can satisfy the requirements for the request
- If during the process of checking the same, it is found that some linking information is stale
or missing, the corresponding keys upto the key in the csiDirectory is cleaned up, to start afresh
- If the key with the CO VolName is not found, or was cleaned up, the request is treated as a
new create request, and an CephUUIDDirectory is created first with a generated uuid, this ensures
that we do not use a uuid that is already in use
- Next, a key with the VolName is created in the csiDirectory, and its value is updated to store the
generated uuid
- This is followed by updating the CephUUIDDirectory with the VolName in the csiNameKey and the RBD image
name in the csiImageKey
- Finally, the volume is created (or promoted from a snapshot, if content source was provided),
using the uuid and a corresponding name prefix (namingPrefix) as the volume name
The entire operation is locked based on VolName hash, to ensure there is only ever a single entity
modifying the related omaps for a given VolName.
This ensures idempotent nature of creates, as the same CO generated VolName would attempt to use
the same volume uuid to serve the request, as the relations are saved in the respective omaps.
Deletion of omaps:
Delete requests would not contain the VolName, hence deletion uses the volume ID, which is encoded
with the volume uuid in it, to find the volume and the CephUUIDDirectory. The CephUUIDDirectory is
read to get the VolName that this image points to. This VolName can be further used to read and
delete the key from the csiDirectory.
As we trace back and find the VolName, we also take a hash based lock on the VolName before
proceeding with deleting the volume and the related omap entries, to ensure there is only ever a
single entity modifying the related omaps for a given VolName.
*/
const (
defaultVolumeNamingPrefix string = "csi-vol-"
defaultSnapshotNamingPrefix string = "csi-snap-"
)
// CSIJournal defines the interface and the required key names for the above RADOS based OMaps
type CSIJournal struct {
// csiDirectory is the name of the CSI volumes object map that contains CSI volume-name (or
// snapshot name) based keys
csiDirectory string
// CSI volume-name keyname prefix, for key in csiDirectory, suffix is the CSI passed volume name
csiNameKeyPrefix string
// Per Ceph volume (RBD/FS-subvolume) object map name prefix, suffix is the generated volume uuid
cephUUIDDirectoryPrefix string
// CSI volume-name key in per Ceph volume object map, containing CSI volume-name for which the
// Ceph volume was created
csiNameKey string
// CSI image-name key in per Ceph volume object map, containing RBD image-name
// of this Ceph volume
csiImageKey string
// pool ID where csiDirectory is maintained, as it can be different from where the ceph volume
// object map is maintained, during topology based provisioning
csiJournalPool string
// source volume name key in per Ceph snapshot object map, containing Ceph source volume uuid
// for which the snapshot was created
cephSnapSourceKey string
// namespace in which the RADOS objects are stored, default is no namespace
namespace string
// encryptKMS in which encryption passphrase was saved, default is no encryption
encryptKMSKey string
}
// NewCSIVolumeJournal returns an instance of CSIJournal for volumes
func NewCSIVolumeJournal() *CSIJournal {
return &CSIJournal{
csiDirectory: "csi.volumes",
csiNameKeyPrefix: "csi.volume.",
cephUUIDDirectoryPrefix: "csi.volume.",
csiNameKey: "csi.volname",
csiImageKey: "csi.imagename",
csiJournalPool: "csi.journalpool",
cephSnapSourceKey: "",
namespace: "",
encryptKMSKey: "csi.volume.encryptKMS",
}
}
// NewCSISnapshotJournal returns an instance of CSIJournal for snapshots
func NewCSISnapshotJournal() *CSIJournal {
return &CSIJournal{
csiDirectory: "csi.snaps",
csiNameKeyPrefix: "csi.snap.",
cephUUIDDirectoryPrefix: "csi.snap.",
csiNameKey: "csi.snapname",
csiImageKey: "csi.imagename",
csiJournalPool: "csi.journalpool",
cephSnapSourceKey: "csi.source",
namespace: "",
encryptKMSKey: "csi.volume.encryptKMS",
}
}
// GetNameForUUID returns volume name
func (cj *CSIJournal) GetNameForUUID(prefix, uid string, isSnapshot bool) string {
if prefix == "" {
if isSnapshot {
prefix = defaultSnapshotNamingPrefix
} else {
prefix = defaultVolumeNamingPrefix
}
}
return prefix + uid
}
// SetCSIDirectorySuffix sets the given suffix for the csiDirectory omap
func (cj *CSIJournal) SetCSIDirectorySuffix(suffix string) {
cj.csiDirectory = cj.csiDirectory + "." + suffix
}
// SetNamespace sets the namespace in which all RADOS objects would be created
func (cj *CSIJournal) SetNamespace(ns string) {
cj.namespace = ns
}
// ImageData contains image name and stored CSI properties
type ImageData struct {
ImageUUID string
ImagePool string
ImagePoolID int64
ImageAttributes *ImageAttributes
}
/*
CheckReservation checks if given request name contains a valid reservation
- If there is a valid reservation, then the corresponding UUID for the volume/snapshot is returned
- If there is a reservation that is stale (or not fully cleaned up), it is garbage collected using
the UndoReservation call, as appropriate
- If a snapshot is being checked, then its source is matched to the parentName that is provided
NOTE: As the function manipulates omaps, it should be called with a lock against the request name
held, to prevent parallel operations from modifying the state of the omaps for this request name.
Return values:
- string: Contains the UUID that was reserved for the passed in reqName, empty if
there was no reservation found
- error: non-nil in case of any errors
*/
func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr *Credentials,
journalPool, reqName, namePrefix, parentName, kmsConfig string) (*ImageData, error) {
var (
snapSource bool
objUUID string
savedImagePool string
savedImagePoolID int64 = InvalidPoolID
)
if parentName != "" {
if cj.cephSnapSourceKey == "" {
err := errors.New("invalid request, cephSnapSourceKey is nil")
return nil, err
}
snapSource = true
}
// check if request name is already part of the directory omap
objUUIDAndPool, err := GetOMapValue(ctx, monitors, cr, journalPool, cj.namespace, cj.csiDirectory,
cj.csiNameKeyPrefix+reqName)
if err != nil {
// error should specifically be not found, for volume to be absent, any other error
// is not conclusive, and we should not proceed
switch err.(type) {
case ErrKeyNotFound, ErrPoolNotFound:
return nil, nil
}
return nil, err
}
// check UUID only encoded value
if len(objUUIDAndPool) == uuidEncodedLength {
objUUID = objUUIDAndPool
savedImagePool = journalPool
} else { // check poolID/UUID encoding; extract the vol UUID and pool name
var buf64 []byte
components := strings.Split(objUUIDAndPool, "/")
objUUID = components[1]
savedImagePoolIDStr := components[0]
buf64, err = hex.DecodeString(savedImagePoolIDStr)
if err != nil {
return nil, err
}
savedImagePoolID = int64(binary.BigEndian.Uint64(buf64))
savedImagePool, err = GetPoolName(ctx, monitors, cr, savedImagePoolID)
if err != nil {
if _, ok := err.(ErrPoolNotFound); ok {
err = cj.UndoReservation(ctx, monitors, cr, journalPool, "", "", reqName)
}
return nil, err
}
}
savedImageAttributes, err := cj.GetImageAttributes(ctx, monitors, cr, savedImagePool,
objUUID, snapSource)
if err != nil {
// error should specifically be not found, for image to be absent, any other error
// is not conclusive, and we should not proceed
if _, ok := err.(ErrKeyNotFound); ok {
err = cj.UndoReservation(ctx, monitors, cr, journalPool, savedImagePool,
cj.GetNameForUUID(namePrefix, objUUID, snapSource), reqName)
}
return nil, err
}
// check if UUID key points back to the request name
if savedImageAttributes.RequestName != reqName {
// NOTE: This should never be possible, hence no cleanup, but log error
// and return, as cleanup may need to occur manually!
return nil, fmt.Errorf("internal state inconsistent, omap names mismatch,"+
" request name (%s) volume UUID (%s) volume omap name (%s)",
reqName, objUUID, savedImageAttributes.RequestName)
}
if kmsConfig != "" {
if savedImageAttributes.KmsID != kmsConfig {
return nil, fmt.Errorf("internal state inconsistent, omap encryption KMS"+
" mismatch, request KMS (%s) volume UUID (%s) volume omap KMS (%s)",
kmsConfig, objUUID, savedImageAttributes.KmsID)
}
}
// TODO: skipping due to excessive poolID to poolname call, also this should never happen!
// check if journal pool points back to the passed in journal pool
// if savedJournalPoolID != journalPoolID {
if snapSource {
// check if source UUID key points back to the parent volume passed in
if savedImageAttributes.SourceName != parentName {
// NOTE: This can happen if there is a snapname conflict, and we already have a snapshot
// with the same name pointing to a different UUID as the source
err = fmt.Errorf("snapname points to different volume, request name (%s)"+
" source name (%s) saved source name (%s)",
reqName, parentName, savedImageAttributes.SourceName)
return nil, ErrSnapNameConflict{reqName, err}
}
}
imageData := &ImageData{
ImageUUID: objUUID,
ImagePool: savedImagePool,
ImagePoolID: savedImagePoolID,
ImageAttributes: savedImageAttributes,
}
return imageData, nil
}
/*
UndoReservation undoes a reservation, in the reverse order of ReserveName
- The UUID directory is cleaned up before the VolName key in the csiDirectory is cleaned up
NOTE: Ensure that the Ceph volume (image or FS subvolume) backing the reservation is cleaned up
prior to cleaning up the reservation
NOTE: As the function manipulates omaps, it should be called with a lock against the request name
held, to prevent parallel operations from modifying the state of the omaps for this request name.
Input arguments:
- csiJournalPool: Pool name that holds the CSI request name based journal
- volJournalPool: Pool name that holds the image/subvolume and the per-image journal (may be
different if image is created in a topology constrained pool)
*/
func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr *Credentials,
csiJournalPool, volJournalPool, volName, reqName string) error {
// delete volume UUID omap (first, inverse of create order)
if volName != "" {
if len(volName) < 36 {
return fmt.Errorf("unable to parse UUID from %s, too short", volName)
}
imageUUID := volName[len(volName)-36:]
if valid := uuid.Parse(imageUUID); valid == nil {
return fmt.Errorf("failed parsing UUID in %s", volName)
}
err := RemoveObject(ctx, monitors, cr, volJournalPool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID)
if err != nil {
if _, ok := err.(ErrObjectNotFound); !ok {
klog.Errorf(Log(ctx, "failed removing oMap %s (%s)"), cj.cephUUIDDirectoryPrefix+imageUUID, err)
return err
}
}
}
// delete the request name key (last, inverse of create order)
err := RemoveOMapKey(ctx, monitors, cr, csiJournalPool, cj.namespace, cj.csiDirectory,
cj.csiNameKeyPrefix+reqName)
if err != nil {
klog.Errorf(Log(ctx, "failed removing oMap key %s (%s)"), cj.csiNameKeyPrefix+reqName, err)
return err
}
return err
}
// reserveOMapName creates an omap with passed in oMapNamePrefix and a generated <uuid>.
// It ensures generated omap name does not already exist and if conflicts are detected, a set
// number of retires with newer uuids are attempted before returning an error
func reserveOMapName(ctx context.Context, monitors string, cr *Credentials, pool, namespace, oMapNamePrefix string) (string, error) {
var iterUUID string
maxAttempts := 5
attempt := 1
for attempt <= maxAttempts {
// generate a uuid for the image name
iterUUID = uuid.NewUUID().String()
err := CreateObject(ctx, monitors, cr, pool, namespace, oMapNamePrefix+iterUUID)
if err != nil {
if _, ok := err.(ErrObjectExists); ok {
attempt++
// try again with a different uuid, for maxAttempts tries
klog.V(4).Infof(Log(ctx, "uuid (%s) conflict detected, retrying (attempt %d of %d)"),
iterUUID, attempt, maxAttempts)
continue
}
return "", err
}
return iterUUID, nil
}
return "", errors.New("uuid conflicts exceeds retry threshold")
}
/*
ReserveName adds respective entries to the csiDirectory omaps, post generating a target
UUIDDirectory for use. Further, these functions update the UUIDDirectory omaps, to store back
pointers to the CSI generated request names.
NOTE: As the function manipulates omaps, it should be called with a lock against the request name
held, to prevent parallel operations from modifying the state of the omaps for this request name.
Input arguments:
- journalPool: Pool where the CSI journal is stored (maybe different than the pool where the
image/subvolume is created duw to topology constraints)
- journalPoolID: pool ID of the journalPool
- imagePool: Pool where the image/subvolume is created
- imagePoolID: pool ID of the imagePool
- reqName: Name of the volume request received
- namePrefix: Prefix to use when generating the image/subvolume name (suffix is an auto-genetated UUID)
- parentName: Name of the parent image/subvolume if reservation is for a snapshot (optional)
- kmsConf: Name of the key management service used to encrypt the image (optional)
Return values:
- string: Contains the UUID that was reserved for the passed in reqName
- string: Contains the image name that was reserved for the passed in reqName
- error: non-nil in case of any errors
*/
func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Credentials,
journalPool string, journalPoolID int64,
imagePool string, imagePoolID int64,
reqName, namePrefix, parentName, kmsConf string) (string, string, error) {
// TODO: Take in-arg as ImageAttributes?
var (
snapSource bool
nameKeyVal string
)
if parentName != "" {
if cj.cephSnapSourceKey == "" {
err := errors.New("invalid request, cephSnapSourceKey is nil")
return "", "", err
}
snapSource = true
}
// Create the UUID based omap first, to reserve the same and avoid conflicts
// NOTE: If any service loss occurs post creation of the UUID directory, and before
// setting the request name key (csiNameKey) to point back to the UUID directory, the
// UUID directory key will be leaked
volUUID, err := reserveOMapName(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix)
if err != nil {
return "", "", err
}
imageName := cj.GetNameForUUID(namePrefix, volUUID, snapSource)
// Create request name (csiNameKey) key in csiDirectory and store the UUID based
// volume name and optionally the image pool location into it
if journalPool != imagePool && imagePoolID != InvalidPoolID {
buf64 := make([]byte, 8)
binary.BigEndian.PutUint64(buf64, uint64(imagePoolID))
poolIDEncodedHex := hex.EncodeToString(buf64)
nameKeyVal = poolIDEncodedHex + "/" + volUUID
} else {
nameKeyVal = volUUID
}
err = SetOMapKeyValue(ctx, monitors, cr, journalPool, cj.namespace, cj.csiDirectory,
cj.csiNameKeyPrefix+reqName, nameKeyVal)
if err != nil {
return "", "", err
}
defer func() {
if err != nil {
klog.Warningf(Log(ctx, "reservation failed for volume: %s"), reqName)
errDefer := cj.UndoReservation(ctx, monitors, cr, imagePool, journalPool, imageName, reqName)
if errDefer != nil {
klog.Warningf(Log(ctx, "failed undoing reservation of volume: %s (%v)"), reqName, errDefer)
}
}
}()
// NOTE: UUID directory is stored on the same pool as the image, helps determine image attributes
// and also CSI journal pool, when only the VolumeID is passed in (e.g DeleteVolume/DeleteSnapshot,
// VolID during CreateSnapshot).
// Update UUID directory to store CSI request name
err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.csiNameKey, reqName)
if err != nil {
return "", "", err
}
// Update UUID directory to store image name
err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.csiImageKey, imageName)
if err != nil {
return "", "", err
}
// Update UUID directory to store encryption values
if kmsConf != "" {
err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.encryptKMSKey, kmsConf)
if err != nil {
return "", "", err
}
}
if journalPool != imagePool && journalPoolID != InvalidPoolID {
buf64 := make([]byte, 8)
binary.BigEndian.PutUint64(buf64, uint64(journalPoolID))
journalPoolIDStr := hex.EncodeToString(buf64)
// Update UUID directory to store CSI journal pool name (prefer ID instead of name to be pool rename proof)
err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.csiJournalPool, journalPoolIDStr)
if err != nil {
return "", "", err
}
}
if snapSource {
// Update UUID directory to store source volume UUID in case of snapshots
err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.cephSnapSourceKey, parentName)
if err != nil {
return "", "", err
}
}
return volUUID, imageName, nil
}
// ImageAttributes contains all CSI stored image attributes, typically as OMap keys
type ImageAttributes struct {
RequestName string // Contains the request name for the passed in UUID
SourceName string // Contains the parent image name for the passed in UUID, if it is a snapshot
ImageName string // Contains the image or subvolume name for the passed in UUID
KmsID string // Contains encryption KMS, if it is an encrypted image
JournalPoolID int64 // Pool ID of the CSI journal pool, stored in big endian format (on-disk data)
}
// GetImageAttributes fetches all keys and their values, from a UUID directory, returning ImageAttributes structure
func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, cr *Credentials, pool, objectUUID string, snapSource bool) (*ImageAttributes, error) {
var (
err error
imageAttributes *ImageAttributes = &ImageAttributes{}
)
if snapSource && cj.cephSnapSourceKey == "" {
err = errors.New("invalid request, cephSnapSourceKey is nil")
return nil, err
}
// TODO: fetch all omap vals in one call, than make multiple listomapvals
imageAttributes.RequestName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiNameKey)
if err != nil {
return nil, err
}
// image key was added at some point, so not all volumes will have this key set
// when ceph-csi was upgraded
imageAttributes.ImageName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiImageKey)
if err != nil {
// if the key was not found, assume the default key + UUID
// otherwise return error
switch err.(type) {
default:
return nil, err
case ErrKeyNotFound, ErrPoolNotFound:
}
if snapSource {
imageAttributes.ImageName = defaultSnapshotNamingPrefix + objectUUID
} else {
imageAttributes.ImageName = defaultVolumeNamingPrefix + objectUUID
}
}
imageAttributes.KmsID, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.encryptKMSKey)
if err != nil {
// ErrKeyNotFound means no encryption KMS was used
switch err.(type) {
default:
return nil, fmt.Errorf("OMapVal for %s/%s failed to get encryption KMS value: %s",
pool, cj.cephUUIDDirectoryPrefix+objectUUID, err)
case ErrKeyNotFound, ErrPoolNotFound:
}
}
journalPoolIDStr, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiJournalPool)
if err != nil {
if _, ok := err.(ErrKeyNotFound); !ok {
return nil, err
}
imageAttributes.JournalPoolID = InvalidPoolID
} else {
var buf64 []byte
buf64, err = hex.DecodeString(journalPoolIDStr)
if err != nil {
return nil, err
}
imageAttributes.JournalPoolID = int64(binary.BigEndian.Uint64(buf64))
}
if snapSource {
imageAttributes.SourceName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.cephSnapSourceKey)
if err != nil {
return nil, err
}
}
return imageAttributes, nil
}