util: add health-checker for periodic filesystem checks

Signed-off-by: Niels de Vos <ndevos@ibm.com>
This commit is contained in:
Niels de Vos 2023-09-13 16:02:16 +02:00 committed by mergify[bot]
parent 95149642cb
commit 54fc65a561
7 changed files with 730 additions and 0 deletions

View File

@ -0,0 +1,107 @@
/*
Copyright 2023 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthchecker
import (
"fmt"
"sync"
"time"
)
// command is what is sent through the channel to terminate the go routine.
type command string
const (
// stopCommand is sent through the channel to stop checking.
stopCommand = command("STOP")
)
type checker struct {
// interval contains the time to sleep between health checks.
interval time.Duration
// timeout contains the delay (interval + timeout)
timeout time.Duration
// mutex protects against concurrent access to healty, err and
// lastUpdate
mutex *sync.RWMutex
// current status
isRunning bool
healthy bool
err error
lastUpdate time.Time
// commands is the channel to read commands from; when to stop.
commands chan command
runChecker func()
}
func (c *checker) initDefaults() {
c.interval = 60 * time.Second
c.timeout = 15 * time.Second
c.mutex = &sync.RWMutex{}
c.isRunning = false
c.err = nil
c.healthy = true
c.lastUpdate = time.Now()
c.commands = make(chan command)
c.runChecker = func() {
panic("BUG: implement runChecker() in the final checker struct")
}
}
func (c *checker) start() {
if c.isRunning {
return
}
go c.runChecker()
}
func (c *checker) stop() {
c.commands <- stopCommand
}
func (c *checker) isHealthy() (bool, error) {
// check for the last update, it should be within
//
// c.lastUpdate < (c.interval + c.timeout)
//
// Without such a check, a single slow write/read could trigger actions
// to recover an unhealthy volume already.
//
// It is required to check, in case the write or read in the go routine
// is blocked.
delay := time.Since(c.lastUpdate)
if delay > (c.interval + c.timeout) {
c.mutex.Lock()
c.healthy = false
c.err = fmt.Errorf("health-check has not responded for %f seconds", delay.Seconds())
c.mutex.Unlock()
}
// read lock to get consistency between the return values
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.healthy, c.err
}

View File

@ -0,0 +1,118 @@
/*
Copyright 2023 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthchecker
import (
"errors"
"os"
"path"
"time"
)
type fileChecker struct {
checker
// filename contains the filename that is used for checking.
filename string
}
func newFileChecker(dir string) ConditionChecker {
fc := &fileChecker{
filename: path.Join(dir, "csi-volume-condition.ts"),
}
fc.initDefaults()
fc.checker.runChecker = func() {
fc.isRunning = true
ticker := time.NewTicker(fc.interval)
defer ticker.Stop()
for {
select {
case <-fc.commands: // STOP command received
fc.isRunning = false
return
case now := <-ticker.C:
err := fc.writeTimestamp(now)
if err != nil {
fc.mutex.Lock()
fc.healthy = false
fc.err = err
fc.mutex.Unlock()
continue
}
ts, err := fc.readTimestamp()
if err != nil {
fc.mutex.Lock()
fc.healthy = false
fc.err = err
fc.mutex.Unlock()
continue
}
// verify that the written timestamp is read back
if now.Compare(ts) != 0 {
fc.mutex.Lock()
fc.healthy = false
fc.err = errors.New("timestamp read from file does not match what was written")
fc.mutex.Unlock()
continue
}
// run health check, write a timestamp to a file, read it back
fc.mutex.Lock()
fc.healthy = true
fc.err = nil
fc.lastUpdate = ts
fc.mutex.Unlock()
}
}
}
return fc
}
// readTimestamp reads the JSON formatted timestamp from the file.
func (fc *fileChecker) readTimestamp() (time.Time, error) {
var ts time.Time
data, err := os.ReadFile(fc.filename)
if err != nil {
return ts, err
}
err = ts.UnmarshalJSON(data)
return ts, err
}
// writeTimestamp writes the timestamp to the file in JSON format.
func (fc *fileChecker) writeTimestamp(ts time.Time) error {
data, err := ts.MarshalJSON()
if err != nil {
return err
}
//nolint:gosec // allow reading of the timestamp for debugging
return os.WriteFile(fc.filename, data, 0o644)
}

View File

@ -0,0 +1,82 @@
/*
Copyright 2023 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthchecker
import (
"testing"
"time"
)
func TestFileChecker(t *testing.T) {
t.Parallel()
volumePath := t.TempDir()
fc := newFileChecker(volumePath)
checker, ok := fc.(*fileChecker)
if !ok {
t.Errorf("failed to convert fc to *fileChecker: %v", fc)
}
checker.interval = time.Second * 5
// start the checker
checker.start()
// wait a second to get the go routine running
time.Sleep(time.Second)
if !checker.isRunning {
t.Error("checker failed to start")
}
for i := 0; i < 10; i++ {
// check health, should be healthy
healthy, msg := checker.isHealthy()
if !healthy || msg != nil {
t.Error("volume is unhealthy")
}
time.Sleep(time.Second)
}
if !checker.isRunning {
t.Error("runChecker() exited already")
}
// stop the checker
checker.stop()
}
func TestWriteReadTimestamp(t *testing.T) {
t.Parallel()
volumePath := t.TempDir()
fc := newFileChecker(volumePath)
checker, ok := fc.(*fileChecker)
if !ok {
t.Errorf("failed to convert fc to *fileChecker: %v", fc)
}
ts := time.Now()
err := checker.writeTimestamp(ts)
if err != nil {
t.Fatalf("failed to write timestamp: %v", err)
}
_, err = checker.readTimestamp()
if err != nil {
t.Fatalf("failed to read timestamp: %v", err)
}
}

View File

@ -0,0 +1,208 @@
/*
Copyright 2023 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthchecker
import (
"fmt"
"os"
"path/filepath"
"sync"
)
// CheckerType describes the type of health-check that needs to be done.
type CheckerType uint64
const (
// StatCheckerType uses the stat() syscall to validate volume health.
StatCheckerType = iota
// FileCheckerType writes and reads a timestamp to a file for checking the
// volume health.
FileCheckerType
)
// Manager provides the API for getting the health status of a volume. The main
// usage is requesting the health status by volumeID.
//
// When the Manager detects that a new volumeID is used for checking, a new
// instance of a ConditionChecker is created for the volumeID on the given
// path, and started.
//
// Once the volumeID is not active anymore (when NodeUnstageVolume is called),
// the ConditionChecker needs to be stopped, which can be done by
// Manager.StopChecker().
type Manager interface {
// StartChecker starts a health-checker of the requested type for the
// volumeID using the path. The path usually is the publishTargetPath, and
// a unique path for this checker. If the path can be used by multiple
// containers, use the StartSharedChecker function instead.
StartChecker(volumeID, path string, ct CheckerType) error
// StartSharedChecker starts a health-checker of the requested type for the
// volumeID using the path. The path usually is the stagingTargetPath, and
// can be used for multiple containers.
StartSharedChecker(volumeID, path string, ct CheckerType) error
StopChecker(volumeID, path string)
StopSharedChecker(volumeID string)
// IsHealthy locates the checker for the volumeID and path. If no checker
// is found, `true` is returned together with an error message.
// When IsHealthy runs into an internal error, it is assumed that the
// volume is healthy. Only when it is confirmed that the volume is
// unhealthy, `false` is returned together with an error message.
IsHealthy(volumeID, path string) (bool, error)
}
// ConditionChecker describes the interface that a health status reporter needs
// to implement. It is used internally by the Manager only.
type ConditionChecker interface {
// start runs a the health checking function in a new go routine.
start()
// stop terminates a the health checking function that runs in a go
// routine.
stop()
// isHealthy returns the status of the volume, without blocking.
isHealthy() (bool, error)
}
type healthCheckManager struct {
checkers sync.Map // map[volumeID]ConditionChecker
}
func NewHealthCheckManager() Manager {
return &healthCheckManager{
checkers: sync.Map{},
}
}
func (hcm *healthCheckManager) StartSharedChecker(volumeID, path string, ct CheckerType) error {
return hcm.createChecker(volumeID, path, ct, true)
}
func (hcm *healthCheckManager) StartChecker(volumeID, path string, ct CheckerType) error {
return hcm.createChecker(volumeID, path, ct, false)
}
// createChecker decides based on the CheckerType what checker to start for
// the volume.
func (hcm *healthCheckManager) createChecker(volumeID, path string, ct CheckerType, shared bool) error {
switch ct {
case FileCheckerType:
return hcm.startFileChecker(volumeID, path, shared)
case StatCheckerType:
return hcm.startStatChecker(volumeID, path, shared)
}
return nil
}
// startFileChecker initializes the fileChecker and starts it.
func (hcm *healthCheckManager) startFileChecker(volumeID, path string, shared bool) error {
workdir := filepath.Join(path, ".csi")
err := os.Mkdir(workdir, 0o755)
if err != nil && !os.IsExist(err) {
return fmt.Errorf("failed to created workdir %q for health-checker: %w", workdir, err)
}
cc := newFileChecker(workdir)
return hcm.startChecker(cc, volumeID, path, shared)
}
// startStatChecker initializes the statChecker and starts it.
func (hcm *healthCheckManager) startStatChecker(volumeID, path string, shared bool) error {
cc := newStatChecker(path)
return hcm.startChecker(cc, volumeID, path, shared)
}
// startChecker adds the checker to its map and starts it.
// Shared checkers are key'd by their volumeID, whereas non-shared checkers
// are key'd by theit volumeID+path.
func (hcm *healthCheckManager) startChecker(cc ConditionChecker, volumeID, path string, shared bool) error {
key := volumeID
if shared {
key = fallbackKey(volumeID, path)
}
// load the 'old' ConditionChecker if it exists, otherwise store 'cc'
old, ok := hcm.checkers.LoadOrStore(key, cc)
if ok {
// 'old' was loaded, cast it to ConditionChecker
_, ok = old.(ConditionChecker)
if !ok {
return fmt.Errorf("failed to cast cc to ConditionChecker for volume-id %q", volumeID)
}
} else {
// 'cc' was stored, start it only once
cc.start()
}
return nil
}
func (hcm *healthCheckManager) StopSharedChecker(volumeID string) {
hcm.StopChecker(volumeID, "")
}
func (hcm *healthCheckManager) StopChecker(volumeID, path string) {
old, ok := hcm.checkers.LoadAndDelete(fallbackKey(volumeID, path))
if !ok {
// nothing was loaded, nothing to do
return
}
// 'old' was loaded, cast it to ConditionChecker
cc, ok := old.(ConditionChecker)
if !ok {
// failed to cast, should not be possible
return
}
cc.stop()
}
func (hcm *healthCheckManager) IsHealthy(volumeID, path string) (bool, error) {
// load the 'old' ConditionChecker if it exists
old, ok := hcm.checkers.Load(volumeID)
if !ok {
// try fallback which include an optional (unique) path (usually publishTargetPath)
old, ok = hcm.checkers.Load(fallbackKey(volumeID, path))
if !ok {
return true, fmt.Errorf("no ConditionChecker for volume-id: %s", volumeID)
}
}
// 'old' was loaded, cast it to ConditionChecker
cc, ok := old.(ConditionChecker)
if !ok {
return true, fmt.Errorf("failed to cast cc to ConditionChecker for volume-id %q", volumeID)
}
return cc.isHealthy()
}
// fallbackKey returns the key for a checker in the map. If the path is empty,
// it is assumed that the key'd checked is shared.
func fallbackKey(volumeID, path string) string {
if path == "" {
return volumeID
}
return fmt.Sprintf("%s:%s", volumeID, path)
}

View File

@ -0,0 +1,85 @@
/*
Copyright 2023 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthchecker
import (
"testing"
)
func TestManager(t *testing.T) {
t.Parallel()
volumeID := "fake-volume-id"
volumePath := t.TempDir()
mgr := NewHealthCheckManager()
// expected to have an error in msg
healthy, msg := mgr.IsHealthy(volumeID, volumePath)
if !(healthy && msg != nil) {
t.Error("ConditionChecker was not started yet, did not get an error")
}
t.Log("start the checker")
err := mgr.StartChecker(volumeID, volumePath, StatCheckerType)
if err != nil {
t.Fatalf("ConditionChecker could not get started: %v", err)
}
t.Log("check health, should be healthy")
healthy, msg = mgr.IsHealthy(volumeID, volumePath)
if !healthy || err != nil {
t.Errorf("volume is unhealthy: %s", msg)
}
t.Log("stop the checker")
mgr.StopChecker(volumeID, volumePath)
}
func TestSharedChecker(t *testing.T) {
t.Parallel()
volumeID := "fake-volume-id"
volumePath := t.TempDir()
mgr := NewHealthCheckManager()
// expected to have an error in msg
healthy, msg := mgr.IsHealthy(volumeID, volumePath)
if !(healthy && msg != nil) {
t.Error("ConditionChecker was not started yet, did not get an error")
}
t.Log("start the checker")
err := mgr.StartSharedChecker(volumeID, volumePath, StatCheckerType)
if err != nil {
t.Fatalf("ConditionChecker could not get started: %v", err)
}
t.Log("check health, should be healthy")
healthy, msg = mgr.IsHealthy(volumeID, volumePath)
if !healthy || err != nil {
t.Errorf("volume is unhealthy: %s", msg)
}
t.Log("check health, should be healthy, path is ignored")
healthy, msg = mgr.IsHealthy(volumeID, "different-path")
if !healthy || err != nil {
t.Errorf("volume is unhealthy: %s", msg)
}
t.Log("stop the checker")
mgr.StopSharedChecker(volumeID)
}

View File

@ -0,0 +1,70 @@
/*
Copyright 2023 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthchecker
import (
"os"
"time"
)
type statChecker struct {
checker
// dirname points to the directory that is used for checking.
dirname string
}
func newStatChecker(dir string) ConditionChecker {
sc := &statChecker{
dirname: dir,
}
sc.initDefaults()
sc.checker.runChecker = func() {
sc.isRunning = true
ticker := time.NewTicker(sc.interval)
defer ticker.Stop()
for {
select {
case <-sc.commands: // STOP command received
sc.isRunning = false
return
case now := <-ticker.C:
_, err := os.Stat(sc.dirname)
if err != nil {
sc.mutex.Lock()
sc.healthy = false
sc.err = err
sc.mutex.Unlock()
continue
}
sc.mutex.Lock()
sc.healthy = true
sc.err = nil
sc.lastUpdate = now
sc.mutex.Unlock()
}
}
}
return sc
}

View File

@ -0,0 +1,60 @@
/*
Copyright 2023 ceph-csi authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthchecker
import (
"testing"
"time"
)
func TestStatChecker(t *testing.T) {
t.Parallel()
volumePath := t.TempDir()
sc := newStatChecker(volumePath)
checker, ok := sc.(*statChecker)
if !ok {
t.Errorf("failed to convert fc to *fileChecker: %v", sc)
}
checker.interval = time.Second * 5
// start the checker
checker.start()
// wait a second to get the go routine running
time.Sleep(time.Second)
if !checker.isRunning {
t.Error("checker failed to start")
}
for i := 0; i < 10; i++ {
// check health, should be healthy
healthy, msg := checker.isHealthy()
if !healthy || msg != nil {
t.Error("volume is unhealthy")
}
time.Sleep(time.Second)
}
if !checker.isRunning {
t.Error("runChecker() exited already")
}
// stop the checker
checker.stop()
}