diff --git a/internal/health-checker/checker.go b/internal/health-checker/checker.go new file mode 100644 index 000000000..5eef779b5 --- /dev/null +++ b/internal/health-checker/checker.go @@ -0,0 +1,107 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "fmt" + "sync" + "time" +) + +// command is what is sent through the channel to terminate the go routine. +type command string + +const ( + // stopCommand is sent through the channel to stop checking. + stopCommand = command("STOP") +) + +type checker struct { + // interval contains the time to sleep between health checks. + interval time.Duration + + // timeout contains the delay (interval + timeout) + timeout time.Duration + + // mutex protects against concurrent access to healty, err and + // lastUpdate + mutex *sync.RWMutex + + // current status + isRunning bool + healthy bool + err error + lastUpdate time.Time + + // commands is the channel to read commands from; when to stop. + commands chan command + + runChecker func() +} + +func (c *checker) initDefaults() { + c.interval = 60 * time.Second + c.timeout = 15 * time.Second + c.mutex = &sync.RWMutex{} + c.isRunning = false + c.err = nil + c.healthy = true + c.lastUpdate = time.Now() + c.commands = make(chan command) + + c.runChecker = func() { + panic("BUG: implement runChecker() in the final checker struct") + } +} + +func (c *checker) start() { + if c.isRunning { + return + } + + go c.runChecker() +} + +func (c *checker) stop() { + c.commands <- stopCommand +} + +func (c *checker) isHealthy() (bool, error) { + // check for the last update, it should be within + // + // c.lastUpdate < (c.interval + c.timeout) + // + // Without such a check, a single slow write/read could trigger actions + // to recover an unhealthy volume already. + // + // It is required to check, in case the write or read in the go routine + // is blocked. + + delay := time.Since(c.lastUpdate) + if delay > (c.interval + c.timeout) { + c.mutex.Lock() + c.healthy = false + c.err = fmt.Errorf("health-check has not responded for %f seconds", delay.Seconds()) + c.mutex.Unlock() + } + + // read lock to get consistency between the return values + c.mutex.RLock() + defer c.mutex.RUnlock() + + return c.healthy, c.err +} diff --git a/internal/health-checker/filechecker.go b/internal/health-checker/filechecker.go new file mode 100644 index 000000000..4d7defc29 --- /dev/null +++ b/internal/health-checker/filechecker.go @@ -0,0 +1,118 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "errors" + "os" + "path" + "time" +) + +type fileChecker struct { + checker + + // filename contains the filename that is used for checking. + filename string +} + +func newFileChecker(dir string) ConditionChecker { + fc := &fileChecker{ + filename: path.Join(dir, "csi-volume-condition.ts"), + } + fc.initDefaults() + + fc.checker.runChecker = func() { + fc.isRunning = true + + ticker := time.NewTicker(fc.interval) + defer ticker.Stop() + + for { + select { + case <-fc.commands: // STOP command received + fc.isRunning = false + + return + case now := <-ticker.C: + err := fc.writeTimestamp(now) + if err != nil { + fc.mutex.Lock() + fc.healthy = false + fc.err = err + fc.mutex.Unlock() + + continue + } + + ts, err := fc.readTimestamp() + if err != nil { + fc.mutex.Lock() + fc.healthy = false + fc.err = err + fc.mutex.Unlock() + + continue + } + + // verify that the written timestamp is read back + if now.Compare(ts) != 0 { + fc.mutex.Lock() + fc.healthy = false + fc.err = errors.New("timestamp read from file does not match what was written") + fc.mutex.Unlock() + + continue + } + + // run health check, write a timestamp to a file, read it back + fc.mutex.Lock() + fc.healthy = true + fc.err = nil + fc.lastUpdate = ts + fc.mutex.Unlock() + } + } + } + + return fc +} + +// readTimestamp reads the JSON formatted timestamp from the file. +func (fc *fileChecker) readTimestamp() (time.Time, error) { + var ts time.Time + + data, err := os.ReadFile(fc.filename) + if err != nil { + return ts, err + } + + err = ts.UnmarshalJSON(data) + + return ts, err +} + +// writeTimestamp writes the timestamp to the file in JSON format. +func (fc *fileChecker) writeTimestamp(ts time.Time) error { + data, err := ts.MarshalJSON() + if err != nil { + return err + } + + //nolint:gosec // allow reading of the timestamp for debugging + return os.WriteFile(fc.filename, data, 0o644) +} diff --git a/internal/health-checker/filechecker_test.go b/internal/health-checker/filechecker_test.go new file mode 100644 index 000000000..3c67c5b84 --- /dev/null +++ b/internal/health-checker/filechecker_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "testing" + "time" +) + +func TestFileChecker(t *testing.T) { + t.Parallel() + + volumePath := t.TempDir() + fc := newFileChecker(volumePath) + checker, ok := fc.(*fileChecker) + if !ok { + t.Errorf("failed to convert fc to *fileChecker: %v", fc) + } + checker.interval = time.Second * 5 + + // start the checker + checker.start() + + // wait a second to get the go routine running + time.Sleep(time.Second) + if !checker.isRunning { + t.Error("checker failed to start") + } + + for i := 0; i < 10; i++ { + // check health, should be healthy + healthy, msg := checker.isHealthy() + if !healthy || msg != nil { + t.Error("volume is unhealthy") + } + + time.Sleep(time.Second) + } + + if !checker.isRunning { + t.Error("runChecker() exited already") + } + + // stop the checker + checker.stop() +} + +func TestWriteReadTimestamp(t *testing.T) { + t.Parallel() + + volumePath := t.TempDir() + fc := newFileChecker(volumePath) + checker, ok := fc.(*fileChecker) + if !ok { + t.Errorf("failed to convert fc to *fileChecker: %v", fc) + } + ts := time.Now() + + err := checker.writeTimestamp(ts) + if err != nil { + t.Fatalf("failed to write timestamp: %v", err) + } + + _, err = checker.readTimestamp() + if err != nil { + t.Fatalf("failed to read timestamp: %v", err) + } +} diff --git a/internal/health-checker/manager.go b/internal/health-checker/manager.go new file mode 100644 index 000000000..ed78ec6fa --- /dev/null +++ b/internal/health-checker/manager.go @@ -0,0 +1,208 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "fmt" + "os" + "path/filepath" + "sync" +) + +// CheckerType describes the type of health-check that needs to be done. +type CheckerType uint64 + +const ( + // StatCheckerType uses the stat() syscall to validate volume health. + StatCheckerType = iota + // FileCheckerType writes and reads a timestamp to a file for checking the + // volume health. + FileCheckerType +) + +// Manager provides the API for getting the health status of a volume. The main +// usage is requesting the health status by volumeID. +// +// When the Manager detects that a new volumeID is used for checking, a new +// instance of a ConditionChecker is created for the volumeID on the given +// path, and started. +// +// Once the volumeID is not active anymore (when NodeUnstageVolume is called), +// the ConditionChecker needs to be stopped, which can be done by +// Manager.StopChecker(). +type Manager interface { + // StartChecker starts a health-checker of the requested type for the + // volumeID using the path. The path usually is the publishTargetPath, and + // a unique path for this checker. If the path can be used by multiple + // containers, use the StartSharedChecker function instead. + StartChecker(volumeID, path string, ct CheckerType) error + + // StartSharedChecker starts a health-checker of the requested type for the + // volumeID using the path. The path usually is the stagingTargetPath, and + // can be used for multiple containers. + StartSharedChecker(volumeID, path string, ct CheckerType) error + + StopChecker(volumeID, path string) + StopSharedChecker(volumeID string) + + // IsHealthy locates the checker for the volumeID and path. If no checker + // is found, `true` is returned together with an error message. + // When IsHealthy runs into an internal error, it is assumed that the + // volume is healthy. Only when it is confirmed that the volume is + // unhealthy, `false` is returned together with an error message. + IsHealthy(volumeID, path string) (bool, error) +} + +// ConditionChecker describes the interface that a health status reporter needs +// to implement. It is used internally by the Manager only. +type ConditionChecker interface { + // start runs a the health checking function in a new go routine. + start() + + // stop terminates a the health checking function that runs in a go + // routine. + stop() + + // isHealthy returns the status of the volume, without blocking. + isHealthy() (bool, error) +} + +type healthCheckManager struct { + checkers sync.Map // map[volumeID]ConditionChecker +} + +func NewHealthCheckManager() Manager { + return &healthCheckManager{ + checkers: sync.Map{}, + } +} + +func (hcm *healthCheckManager) StartSharedChecker(volumeID, path string, ct CheckerType) error { + return hcm.createChecker(volumeID, path, ct, true) +} + +func (hcm *healthCheckManager) StartChecker(volumeID, path string, ct CheckerType) error { + return hcm.createChecker(volumeID, path, ct, false) +} + +// createChecker decides based on the CheckerType what checker to start for +// the volume. +func (hcm *healthCheckManager) createChecker(volumeID, path string, ct CheckerType, shared bool) error { + switch ct { + case FileCheckerType: + return hcm.startFileChecker(volumeID, path, shared) + case StatCheckerType: + return hcm.startStatChecker(volumeID, path, shared) + } + + return nil +} + +// startFileChecker initializes the fileChecker and starts it. +func (hcm *healthCheckManager) startFileChecker(volumeID, path string, shared bool) error { + workdir := filepath.Join(path, ".csi") + err := os.Mkdir(workdir, 0o755) + if err != nil && !os.IsExist(err) { + return fmt.Errorf("failed to created workdir %q for health-checker: %w", workdir, err) + } + + cc := newFileChecker(workdir) + + return hcm.startChecker(cc, volumeID, path, shared) +} + +// startStatChecker initializes the statChecker and starts it. +func (hcm *healthCheckManager) startStatChecker(volumeID, path string, shared bool) error { + cc := newStatChecker(path) + + return hcm.startChecker(cc, volumeID, path, shared) +} + +// startChecker adds the checker to its map and starts it. +// Shared checkers are key'd by their volumeID, whereas non-shared checkers +// are key'd by theit volumeID+path. +func (hcm *healthCheckManager) startChecker(cc ConditionChecker, volumeID, path string, shared bool) error { + key := volumeID + if shared { + key = fallbackKey(volumeID, path) + } + + // load the 'old' ConditionChecker if it exists, otherwise store 'cc' + old, ok := hcm.checkers.LoadOrStore(key, cc) + if ok { + // 'old' was loaded, cast it to ConditionChecker + _, ok = old.(ConditionChecker) + if !ok { + return fmt.Errorf("failed to cast cc to ConditionChecker for volume-id %q", volumeID) + } + } else { + // 'cc' was stored, start it only once + cc.start() + } + + return nil +} + +func (hcm *healthCheckManager) StopSharedChecker(volumeID string) { + hcm.StopChecker(volumeID, "") +} + +func (hcm *healthCheckManager) StopChecker(volumeID, path string) { + old, ok := hcm.checkers.LoadAndDelete(fallbackKey(volumeID, path)) + if !ok { + // nothing was loaded, nothing to do + return + } + + // 'old' was loaded, cast it to ConditionChecker + cc, ok := old.(ConditionChecker) + if !ok { + // failed to cast, should not be possible + return + } + cc.stop() +} + +func (hcm *healthCheckManager) IsHealthy(volumeID, path string) (bool, error) { + // load the 'old' ConditionChecker if it exists + old, ok := hcm.checkers.Load(volumeID) + if !ok { + // try fallback which include an optional (unique) path (usually publishTargetPath) + old, ok = hcm.checkers.Load(fallbackKey(volumeID, path)) + if !ok { + return true, fmt.Errorf("no ConditionChecker for volume-id: %s", volumeID) + } + } + + // 'old' was loaded, cast it to ConditionChecker + cc, ok := old.(ConditionChecker) + if !ok { + return true, fmt.Errorf("failed to cast cc to ConditionChecker for volume-id %q", volumeID) + } + + return cc.isHealthy() +} + +// fallbackKey returns the key for a checker in the map. If the path is empty, +// it is assumed that the key'd checked is shared. +func fallbackKey(volumeID, path string) string { + if path == "" { + return volumeID + } + + return fmt.Sprintf("%s:%s", volumeID, path) +} diff --git a/internal/health-checker/manager_test.go b/internal/health-checker/manager_test.go new file mode 100644 index 000000000..2c44f65b1 --- /dev/null +++ b/internal/health-checker/manager_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "testing" +) + +func TestManager(t *testing.T) { + t.Parallel() + + volumeID := "fake-volume-id" + volumePath := t.TempDir() + mgr := NewHealthCheckManager() + + // expected to have an error in msg + healthy, msg := mgr.IsHealthy(volumeID, volumePath) + if !(healthy && msg != nil) { + t.Error("ConditionChecker was not started yet, did not get an error") + } + + t.Log("start the checker") + err := mgr.StartChecker(volumeID, volumePath, StatCheckerType) + if err != nil { + t.Fatalf("ConditionChecker could not get started: %v", err) + } + + t.Log("check health, should be healthy") + healthy, msg = mgr.IsHealthy(volumeID, volumePath) + if !healthy || err != nil { + t.Errorf("volume is unhealthy: %s", msg) + } + + t.Log("stop the checker") + mgr.StopChecker(volumeID, volumePath) +} + +func TestSharedChecker(t *testing.T) { + t.Parallel() + + volumeID := "fake-volume-id" + volumePath := t.TempDir() + mgr := NewHealthCheckManager() + + // expected to have an error in msg + healthy, msg := mgr.IsHealthy(volumeID, volumePath) + if !(healthy && msg != nil) { + t.Error("ConditionChecker was not started yet, did not get an error") + } + + t.Log("start the checker") + err := mgr.StartSharedChecker(volumeID, volumePath, StatCheckerType) + if err != nil { + t.Fatalf("ConditionChecker could not get started: %v", err) + } + + t.Log("check health, should be healthy") + healthy, msg = mgr.IsHealthy(volumeID, volumePath) + if !healthy || err != nil { + t.Errorf("volume is unhealthy: %s", msg) + } + + t.Log("check health, should be healthy, path is ignored") + healthy, msg = mgr.IsHealthy(volumeID, "different-path") + if !healthy || err != nil { + t.Errorf("volume is unhealthy: %s", msg) + } + + t.Log("stop the checker") + mgr.StopSharedChecker(volumeID) +} diff --git a/internal/health-checker/statchecker.go b/internal/health-checker/statchecker.go new file mode 100644 index 000000000..d5ef69883 --- /dev/null +++ b/internal/health-checker/statchecker.go @@ -0,0 +1,70 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "os" + "time" +) + +type statChecker struct { + checker + + // dirname points to the directory that is used for checking. + dirname string +} + +func newStatChecker(dir string) ConditionChecker { + sc := &statChecker{ + dirname: dir, + } + sc.initDefaults() + + sc.checker.runChecker = func() { + sc.isRunning = true + + ticker := time.NewTicker(sc.interval) + defer ticker.Stop() + + for { + select { + case <-sc.commands: // STOP command received + sc.isRunning = false + + return + case now := <-ticker.C: + _, err := os.Stat(sc.dirname) + if err != nil { + sc.mutex.Lock() + sc.healthy = false + sc.err = err + sc.mutex.Unlock() + + continue + } + + sc.mutex.Lock() + sc.healthy = true + sc.err = nil + sc.lastUpdate = now + sc.mutex.Unlock() + } + } + } + + return sc +} diff --git a/internal/health-checker/statchecker_test.go b/internal/health-checker/statchecker_test.go new file mode 100644 index 000000000..4ea247803 --- /dev/null +++ b/internal/health-checker/statchecker_test.go @@ -0,0 +1,60 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "testing" + "time" +) + +func TestStatChecker(t *testing.T) { + t.Parallel() + + volumePath := t.TempDir() + sc := newStatChecker(volumePath) + checker, ok := sc.(*statChecker) + if !ok { + t.Errorf("failed to convert fc to *fileChecker: %v", sc) + } + checker.interval = time.Second * 5 + + // start the checker + checker.start() + + // wait a second to get the go routine running + time.Sleep(time.Second) + if !checker.isRunning { + t.Error("checker failed to start") + } + + for i := 0; i < 10; i++ { + // check health, should be healthy + healthy, msg := checker.isHealthy() + if !healthy || msg != nil { + t.Error("volume is unhealthy") + } + + time.Sleep(time.Second) + } + + if !checker.isRunning { + t.Error("runChecker() exited already") + } + + // stop the checker + checker.stop() +}