From dc936f52ab6ca4040ca6a15823972849012af8cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mika=C3=ABl=20Cluseau?= Date: Mon, 13 Apr 2026 21:11:07 +0200 Subject: [PATCH] logger: add cgroup option --- Cargo.lock | 11 +++++++ Cargo.toml | 3 +- src/bin/dkl.rs | 8 +++-- src/cgroup.rs | 6 ++-- src/fs.rs | 21 ++++++++++-- src/lib.rs | 10 +++--- src/logger.rs | 89 +++++++++++++++++++++++++++++++++++++++++++------- src/rc.rs | 1 + 8 files changed, 123 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7ce0d46..46d8b29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -362,6 +362,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "signal-hook", "tabled", "thiserror", "tokio", @@ -1480,6 +1481,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a0c28ca5908dbdbcd52e6fdaa00358ab88637f8ab33e1f188dd510eb44b53d" +dependencies = [ + "libc", + "signal-hook-registry", +] + [[package]] name = "signal-hook-registry" version = "1.4.8" diff --git a/Cargo.toml b/Cargo.toml index e76dc4e..1bdd6cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ hex = "0.4.3" human-units = "0.5.3" log = "0.4.27" lz4 = "1.28.1" -nix = { version = "0.31.2", features = ["user"] } +nix = { version = "0.31.2", features = ["process", "signal", "user"] } openssl = "0.10.73" page_size = "0.6.0" reqwest = { version = "0.13.1", features = ["json", "stream", "native-tls", "socks"], default-features = false } @@ -37,6 +37,7 @@ rust-argon2 = "3.0.0" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" serde_yaml = "0.9.34" +signal-hook = "0.4.4" tabled = "0.20.0" thiserror = "2.0.12" tokio = { version = "1.45.1", features = ["fs", "io-std", "macros", "process", "rt"] } diff --git a/src/bin/dkl.rs b/src/bin/dkl.rs index e0053b3..e3a1827 100644 --- a/src/bin/dkl.rs +++ b/src/bin/dkl.rs @@ -1,5 +1,5 @@ use clap::{CommandFactory, Parser, Subcommand}; -use eyre::{format_err, Result}; +use eyre::{Result, format_err}; use human_units::Duration; use log::{debug, error}; use std::net::SocketAddr; @@ -39,6 +39,9 @@ enum Command { /// prefix log lines with time & stream #[arg(long)] with_prefix: bool, + /// exec command in this cgroup + #[arg(long)] + cgroup: Option, command: String, args: Vec, }, @@ -125,6 +128,7 @@ async fn main() -> Result<()> { ref log_path, ref log_name, with_prefix, + cgroup, command, args, } => { @@ -136,7 +140,7 @@ async fn main() -> Result<()> { log_name, with_prefix, } - .run(command, &args) + .run(cgroup, command, &args) .await } C::Log { diff --git a/src/cgroup.rs b/src/cgroup.rs index 81232cc..8c0bbfd 100644 --- a/src/cgroup.rs +++ b/src/cgroup.rs @@ -6,10 +6,10 @@ use std::str::FromStr; use crate::{fs, human::Human}; -const CGROUP_ROOT: &str = "/sys/fs/cgroup/"; +pub const ROOT: &str = "/sys/fs/cgroup"; pub async fn ls(parent: Option>, exclude: &[String]) -> fs::Result<()> { - let mut root = PathBuf::from(CGROUP_ROOT); + let mut root = PathBuf::from(ROOT); if let Some(parent) = parent { root = root.join(parent); } @@ -50,8 +50,8 @@ pub async fn ls(parent: Option>, exclude: &[String]) -> fs:: } use tabled::settings::{ - object::{Column, Row}, Alignment, Modify, + object::{Column, Row}, }; let mut table = table.build(); table.with(tabled::settings::Style::psql()); diff --git a/src/fs.rs b/src/fs.rs index 7d449fb..2473763 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -5,16 +5,26 @@ use tokio::sync::mpsc; pub type Result = std::result::Result; +pub use tokio::fs::ReadDir; + #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("{0}: read_dir: {1}")] + #[error("{0}: read dir: {1}")] ReadDir(PathBuf, std::io::Error), + #[error("{0}: exists: {1}")] + Exists(PathBuf, std::io::Error), #[error("{0}: read: {1}")] Read(PathBuf, std::io::Error), #[error("{0}: stat: {1}")] Stat(PathBuf, std::io::Error), + #[error("{0}: create dir: {1}")] + CreateDir(PathBuf, std::io::Error), #[error("{0}: write: {1}")] Write(PathBuf, std::io::Error), + #[error("{0}: remove file: {1}")] + RemoveFile(PathBuf, std::io::Error), + #[error("{0}: symlink: {1}")] + Symlink(PathBuf, std::io::Error), #[error("{0}: {1}")] Other(PathBuf, String), } @@ -28,10 +38,15 @@ macro_rules! wrap_path { }; } -wrap_path!(read_dir -> fs::ReadDir, ReadDir); +wrap_path!(read_dir -> ReadDir, ReadDir); +wrap_path!(try_exists -> bool, Exists); wrap_path!(read -> Vec, Read); wrap_path!(read_to_string -> String, Read); -wrap_path!(write(content: &[u8]) -> (), Write); +wrap_path!(create_dir -> (), CreateDir); +wrap_path!(create_dir_all -> (), CreateDir); +wrap_path!(remove_file -> (), RemoveFile); +wrap_path!(symlink(link_src: impl AsRef) -> (), Symlink); +wrap_path!(write(content: impl AsRef<[u8]>) -> (), Write); pub fn spawn_walk_dir( dir: impl Into + Send + 'static, diff --git a/src/lib.rs b/src/lib.rs index 35cb920..57750e0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,13 +1,13 @@ pub mod apply; -pub mod rc; -pub mod cgroup; -pub mod human; pub mod bootstrap; +pub mod cgroup; pub mod dls; pub mod dynlay; pub mod fs; +pub mod human; pub mod logger; pub mod proxy; +pub mod rc; #[derive(Debug, Default, serde::Deserialize, serde::Serialize)] pub struct Config { @@ -86,11 +86,11 @@ impl Config { } pub fn base64_decode(s: &str) -> Result, base64::DecodeError> { - use base64::{prelude::BASE64_STANDARD_NO_PAD as B64, Engine as _}; + use base64::{Engine as _, prelude::BASE64_STANDARD_NO_PAD as B64}; B64.decode(s.trim_end_matches('=')) } pub fn base64_encode(b: &[u8]) -> String { - use base64::{prelude::BASE64_STANDARD as B64, Engine as _}; + use base64::{Engine as _, prelude::BASE64_STANDARD as B64}; B64.encode(b) } diff --git a/src/logger.rs b/src/logger.rs index 18502d0..d6670a8 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -5,13 +5,15 @@ use log::{debug, error, warn}; use std::path::{Path, PathBuf}; use std::process::Stdio; use tokio::{ - fs::{self, File}, + fs::File, io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}, process, sync::mpsc, time::{Duration, sleep}, }; +use crate::{cgroup, fs}; + pub type Timestamp = chrono::DateTime; const TS_FORMAT: &str = "%Y%m%d_%H"; @@ -27,7 +29,7 @@ pub struct Logger<'t> { } impl<'t> Logger<'t> { - pub async fn run(&self, command: &str, args: &[String]) -> Result<()> { + pub async fn run(&self, cgroup: Option, command: &str, args: &[String]) -> Result<()> { // make sure we can at least open the log before starting the command let archives_path = &format!("{path}/archives", path = self.log_path); (fs::create_dir_all(archives_path).await) @@ -45,20 +47,57 @@ impl<'t> Logger<'t> { )); // start the command - let mut child = process::Command::new(command) - .args(args) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; + let mut cmd = process::Command::new(command); + cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped()); + if let Some(cgroup) = cgroup.as_deref() { + let mut cg_path = PathBuf::from(cgroup::ROOT); + + let mut parts = cgroup.split('/').chain([self.log_name].into_iter()); + let mut part = parts.next().unwrap(); // 1 element guaranteed + + loop { + cg_path.push(part); + + use std::io::ErrorKind as K; + match tokio::fs::create_dir(&cg_path).await { + Ok(_) => log::debug!("created dir {}", cg_path.display()), + Err(e) if e.kind() == K::AlreadyExists => { + log::debug!("existing dir {}", cg_path.display()) + } + Err(e) => Err(e)?, + } + + let Some(next_part) = parts.next() else { + break; + }; + + let control_file = &cg_path.join("cgroup.subtree_control"); + log::debug!("control file {}", control_file.display()); + tokio::fs::write(control_file, b"+cpu +memory +pids +io").await?; + + part = next_part; + } + + let procs_file = cg_path.join("cgroup.procs"); + log::debug!("procs file {}", procs_file.display()); + fs::write(&procs_file, b"0").await?; + } + + let mut child = cmd.spawn().map_err(|e| format_err!("exec failed: {e}"))?; let (tx, mut rx) = mpsc::unbounded_channel(); tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone())); tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx)); - let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL); + // forward signals + if let Some(child_pid) = child.id() { + std::thread::spawn(move || forward_signals_to(child_pid as i32)); + } // handle output + let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL); + loop { tokio::select!( r = rx.recv() => { @@ -78,13 +117,14 @@ impl<'t> Logger<'t> { ); } + let status = child.wait().await?; + // finalize while let Err(e) = current_log.flush().await { error!("final log flush failed: {e}"); sleep(WRITE_RETRY_DELAY).await; } - let status = child.wait().await?; std::process::exit(status.code().unwrap_or(-1)); } @@ -158,6 +198,29 @@ impl<'t> Logger<'t> { } } +fn forward_signals_to(pid: i32) { + use nix::{ + sys::signal::{Signal, kill}, + unistd::Pid, + }; + use signal_hook::{consts::*, iterator::Signals}; + + log::debug!("forwarding signals to pid {pid}"); + + let mut signals = Signals::new([ + SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGUSR1, SIGUSR2, SIGPIPE, SIGALRM, + ]) + .expect("signal setup failed"); + + let pid = Pid::from_raw(pid); + for signal in &mut signals { + let Ok(signal) = Signal::try_from(signal) else { + continue; + }; + let _ = kill(pid, signal); + } +} + struct LogItem { stream_name: &'static str, ts: chrono::DateTime, @@ -272,14 +335,16 @@ pub fn parse_ts(ts: &str) -> std::result::Result Ok(Timestamp::from_naive_utc_and_offset(dt, Utc)) } -pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result> { +pub async fn log_files(log_path: &str, log_name: &str) -> fs::Result> { let mut dir = PathBuf::from(log_path); dir.push("archives"); let mut entries = Vec::new(); - let mut read_dir = fs::read_dir(dir).await?; + let mut read_dir = fs::read_dir(&dir).await?; - while let Some(entry) = read_dir.next_entry().await? { + while let Some(entry) = + (read_dir.next_entry().await).map_err(|e| fs::Error::ReadDir(dir.clone(), e))? + { let file_name = entry.file_name(); let Some(file_name) = file_name.to_str() else { continue; diff --git a/src/rc.rs b/src/rc.rs index e69de29..8b13789 100644 --- a/src/rc.rs +++ b/src/rc.rs @@ -0,0 +1 @@ +