logger: add cgroup option

This commit is contained in:
Mikaël Cluseau
2026-04-13 21:11:07 +02:00
parent 33fcfbd197
commit dc936f52ab
8 changed files with 123 additions and 26 deletions

View File

@@ -5,13 +5,15 @@ use log::{debug, error, warn};
use std::path::{Path, PathBuf};
use std::process::Stdio;
use tokio::{
fs::{self, File},
fs::File,
io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter},
process,
sync::mpsc,
time::{Duration, sleep},
};
use crate::{cgroup, fs};
pub type Timestamp = chrono::DateTime<Utc>;
const TS_FORMAT: &str = "%Y%m%d_%H";
@@ -27,7 +29,7 @@ pub struct Logger<'t> {
}
impl<'t> Logger<'t> {
pub async fn run(&self, command: &str, args: &[String]) -> Result<()> {
pub async fn run(&self, cgroup: Option<String>, command: &str, args: &[String]) -> Result<()> {
// make sure we can at least open the log before starting the command
let archives_path = &format!("{path}/archives", path = self.log_path);
(fs::create_dir_all(archives_path).await)
@@ -45,20 +47,57 @@ impl<'t> Logger<'t> {
));
// start the command
let mut child = process::Command::new(command)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let mut cmd = process::Command::new(command);
cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped());
if let Some(cgroup) = cgroup.as_deref() {
let mut cg_path = PathBuf::from(cgroup::ROOT);
let mut parts = cgroup.split('/').chain([self.log_name].into_iter());
let mut part = parts.next().unwrap(); // 1 element guaranteed
loop {
cg_path.push(part);
use std::io::ErrorKind as K;
match tokio::fs::create_dir(&cg_path).await {
Ok(_) => log::debug!("created dir {}", cg_path.display()),
Err(e) if e.kind() == K::AlreadyExists => {
log::debug!("existing dir {}", cg_path.display())
}
Err(e) => Err(e)?,
}
let Some(next_part) = parts.next() else {
break;
};
let control_file = &cg_path.join("cgroup.subtree_control");
log::debug!("control file {}", control_file.display());
tokio::fs::write(control_file, b"+cpu +memory +pids +io").await?;
part = next_part;
}
let procs_file = cg_path.join("cgroup.procs");
log::debug!("procs file {}", procs_file.display());
fs::write(&procs_file, b"0").await?;
}
let mut child = cmd.spawn().map_err(|e| format_err!("exec failed: {e}"))?;
let (tx, mut rx) = mpsc::unbounded_channel();
tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone()));
tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx));
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
// forward signals
if let Some(child_pid) = child.id() {
std::thread::spawn(move || forward_signals_to(child_pid as i32));
}
// handle output
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
loop {
tokio::select!(
r = rx.recv() => {
@@ -78,13 +117,14 @@ impl<'t> Logger<'t> {
);
}
let status = child.wait().await?;
// finalize
while let Err(e) = current_log.flush().await {
error!("final log flush failed: {e}");
sleep(WRITE_RETRY_DELAY).await;
}
let status = child.wait().await?;
std::process::exit(status.code().unwrap_or(-1));
}
@@ -158,6 +198,29 @@ impl<'t> Logger<'t> {
}
}
fn forward_signals_to(pid: i32) {
use nix::{
sys::signal::{Signal, kill},
unistd::Pid,
};
use signal_hook::{consts::*, iterator::Signals};
log::debug!("forwarding signals to pid {pid}");
let mut signals = Signals::new([
SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGUSR1, SIGUSR2, SIGPIPE, SIGALRM,
])
.expect("signal setup failed");
let pid = Pid::from_raw(pid);
for signal in &mut signals {
let Ok(signal) = Signal::try_from(signal) else {
continue;
};
let _ = kill(pid, signal);
}
}
struct LogItem {
stream_name: &'static str,
ts: chrono::DateTime<chrono::Utc>,
@@ -272,14 +335,16 @@ pub fn parse_ts(ts: &str) -> std::result::Result<Timestamp, chrono::ParseError>
Ok(Timestamp::from_naive_utc_and_offset(dt, Utc))
}
pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result<Vec<LogFile>> {
pub async fn log_files(log_path: &str, log_name: &str) -> fs::Result<Vec<LogFile>> {
let mut dir = PathBuf::from(log_path);
dir.push("archives");
let mut entries = Vec::new();
let mut read_dir = fs::read_dir(dir).await?;
let mut read_dir = fs::read_dir(&dir).await?;
while let Some(entry) = read_dir.next_entry().await? {
while let Some(entry) =
(read_dir.next_entry().await).map_err(|e| fs::Error::ReadDir(dir.clone(), e))?
{
let file_name = entry.file_name();
let Some(file_name) = file_name.to_str() else {
continue;