From f3b3a9b9c7d55386cdf686cfcc1e876310f9dc2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mika=C3=ABl=20Cluseau?= Date: Tue, 14 Apr 2026 10:52:36 +0200 Subject: [PATCH] logger: cleanup --- Cargo.lock | 1 + Cargo.toml | 1 + src/cgroup.rs | 9 +++ src/logger.rs | 176 +++++++++++++++++++++++++++++--------------------- 4 files changed, 115 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 46d8b29..0d23e67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -353,6 +353,7 @@ dependencies = [ "human-units", "log", "lz4", + "memchr", "nix", "openssl", "page_size", diff --git a/Cargo.toml b/Cargo.toml index 1bdd6cd..832e3f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ hex = "0.4.3" human-units = "0.5.3" log = "0.4.27" lz4 = "1.28.1" +memchr = "2.8.0" nix = { version = "0.31.2", features = ["process", "signal", "user"] } openssl = "0.10.73" page_size = "0.6.0" diff --git a/src/cgroup.rs b/src/cgroup.rs index 829e09f..f14f276 100644 --- a/src/cgroup.rs +++ b/src/cgroup.rs @@ -176,6 +176,15 @@ impl Cgroup { dir.push(name); Self::read(self.path.child(name), &dir).await } + + pub async fn write_param( + &self, + name: impl AsRef, + value: impl AsRef<[u8]>, + ) -> fs::Result<()> { + let cg_dir = PathBuf::from(self.path.as_ref()); + fs::write(cg_dir.join(name), value).await + } } impl PartialEq for Cgroup { diff --git a/src/logger.rs b/src/logger.rs index f66c3c1..16e0742 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -9,7 +9,6 @@ use tokio::{ io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}, process, sync::mpsc, - task::spawn_blocking, time::{sleep, Duration}, }; @@ -52,48 +51,33 @@ impl<'t> Logger<'t> { cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped()); if let Some(cgroup) = cgroup.as_deref() { let mut cg_path = PathBuf::from(cgroup::ROOT); + cg_path.push(cgroup); + cg_path.push(self.log_name); - let mut parts = cgroup.split('/').chain([self.log_name].into_iter()); - let mut part = parts.next().unwrap(); // 1 element guaranteed - - loop { - cg_path.push(part); - - use std::io::ErrorKind as K; - match tokio::fs::create_dir(&cg_path).await { - Ok(_) => log::debug!("created dir {}", cg_path.display()), - Err(e) if e.kind() == K::AlreadyExists => { - log::debug!("existing dir {}", cg_path.display()) - } - Err(e) => Err(e)?, + use std::io::ErrorKind as K; + match tokio::fs::create_dir(&cg_path).await { + Ok(_) => debug!("created dir {}", cg_path.display()), + Err(e) if e.kind() == K::AlreadyExists => { + debug!("existing dir {}", cg_path.display()) } - - let Some(next_part) = parts.next() else { - break; - }; - - let control_file = &cg_path.join("cgroup.subtree_control"); - log::debug!("control file {}", control_file.display()); - tokio::fs::write(control_file, b"+cpu +memory +pids +io").await?; - - part = next_part; + Err(e) => return Err(fs::Error::CreateDir(cg_path, e).into()), } let procs_file = cg_path.join("cgroup.procs"); - log::debug!("procs file {}", procs_file.display()); + debug!("procs file {}", procs_file.display()); fs::write(&procs_file, b"0").await?; } let mut child = cmd.spawn().map_err(|e| format_err!("exec failed: {e}"))?; - let (tx, mut rx) = mpsc::unbounded_channel(); + let (tx, mut rx) = mpsc::channel(8); tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone())); tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx)); // forward signals if let Some(child_pid) = child.id() { - spawn_blocking(move || forward_signals_to(child_pid as i32)).await?; + forward_signals_to(child_pid as i32); } // handle output @@ -153,6 +137,9 @@ impl<'t> Logger<'t> { } out.write_all(&log.line).await?; + if log.line.last() != Some(&b'\n') { + out.write("↵\n".as_bytes()).await?; + } Ok(()) } @@ -166,29 +153,28 @@ impl<'t> Logger<'t> { .open(log_file) .await?; - let link_src = &format!( - "{path}/{name}.log", - path = self.log_path, - name = self.log_name - ); - let link_tgt = log_file - .strip_prefix(&format!("{}/", self.log_path)) - .unwrap_or(log_file); + let link_src = &PathBuf::from(self.log_path) + .join(self.log_name) + .with_added_extension("log"); + let link_tgt = &self.archive_rel_path(ts); let _ = fs::remove_file(link_src).await; - fs::symlink(link_tgt, link_src) - .await - .map_err(|e| format_err!("symlink {link_src} -> {link_tgt} failed: {e}",))?; + fs::symlink(link_tgt, link_src).await.map_err(|e| { + format_err!( + "symlink {s} -> {t} failed: {e}", + s = link_src.display(), + t = link_tgt.display() + ) + })?; Ok(file) } - fn archive_path(&self, ts: Timestamp) -> String { - format!( - "{path}/archives/{file}", - path = self.log_path, - file = self.archive_file(ts) - ) + fn archive_path(&self, ts: Timestamp) -> PathBuf { + PathBuf::from(self.log_path).join(self.archive_rel_path(ts)) + } + fn archive_rel_path(&self, ts: Timestamp) -> PathBuf { + PathBuf::from("archives").join(self.archive_file(ts)) } fn archive_file(&self, ts: Timestamp) -> String { format!( @@ -206,7 +192,7 @@ fn forward_signals_to(pid: i32) { }; use signal_hook::{consts::*, low_level::register}; - log::debug!("forwarding signals to pid {pid}"); + debug!("forwarding signals to pid {pid}"); let pid = Pid::from_raw(pid); let signals = [ @@ -219,7 +205,7 @@ fn forward_signals_to(pid: i32) { }; unsafe { register(sig, move || { - log::debug!("forwarding {signal} to {pid}"); + debug!("forwarding {signal} to {pid}"); let _ = kill(pid, signal); }) .ok(); @@ -233,33 +219,67 @@ struct LogItem { line: Vec, } -async fn copy( - stream_name: &'static str, - out: impl AsyncRead + Unpin, - tx: mpsc::UnboundedSender, -) { - let mut out = BufReader::new(out); +async fn copy(stream_name: &'static str, out: impl AsyncRead + Unpin, tx: mpsc::Sender) { let buf_size = page_size::get(); + let mut out = BufReader::with_capacity(buf_size, out); + let mut line = Vec::with_capacity(buf_size); + + macro_rules! send_line { + () => { + let log = LogItem { + stream_name, + ts: chrono::Utc::now(), + line: line.clone(), + }; + if let Err(e) = tx.send(log).await { + warn!("send line failed: {e}"); + return; + } + line.clear(); + }; + } + loop { - let mut line = Vec::with_capacity(buf_size); - if let Err(e) = out.read_until(b'\n', &mut line).await { - warn!("read {stream_name} failed: {e}"); - return; - } - if line.is_empty() { + let Ok(buf) = (out.fill_buf()) + .await + .inspect_err(|e| warn!("read {stream_name} failed: {e}")) + else { + break; + }; + + if buf.is_empty() { break; } - let log = LogItem { - stream_name, - ts: chrono::Utc::now(), - line, - }; - if let Err(e) = tx.send(log) { - warn!("send line failed: {e}"); - return; + let remaining = buf_size - line.len(); + + if let Some(pos) = memchr::memchr(b'\n', buf) { + let len = pos + 1; + let mut buf = &buf[..len]; + + if len > remaining { + line.extend_from_slice(&buf[..remaining]); + send_line!(); + buf = &buf[remaining..]; + } + + line.extend_from_slice(buf); + out.consume(len); + send_line!(); + } else if buf.len() > remaining { + line.extend_from_slice(&buf[..remaining]); + out.consume(remaining); + send_line!(); + } else { + line.extend_from_slice(buf); + let len = buf.len(); + out.consume(len); } } + + if !line.is_empty() { + send_line!(); + } } pub fn trunc_ts(ts: Timestamp) -> Timestamp { @@ -319,14 +339,14 @@ async fn compress(path: impl AsRef) { let mut out = ZstdEncoder::new(out); async { tokio::io::copy(&mut input, &mut out).await?; - out.flush().await + out.shutdown().await } .await .map_err(|e| format_err!("compression of {path_str} failed: {e}"))?; fs::remove_file(path) .await - .map_err(|e| format_err!("remove {path_str} failed: {e}")) + .map_err(|e| format_err!("remove {path_str} failed: {e}")) } .await; @@ -336,8 +356,9 @@ async fn compress(path: impl AsRef) { } pub fn parse_ts(ts: &str) -> std::result::Result { - let dt = - chrono::NaiveDateTime::parse_from_str(&format!("{ts}0000"), &format!("{TS_FORMAT}%M%S"))?; + let format = &format!("{TS_FORMAT}%M%S"); + let full_ts = &format!("{ts}0000"); + let dt = chrono::NaiveDateTime::parse_from_str(full_ts, format)?; Ok(Timestamp::from_naive_utc_and_offset(dt, Utc)) } @@ -389,16 +410,27 @@ pub async fn log_files(log_path: &str, log_name: &str) -> fs::Result bool { + self.timestamp == other.timestamp + } +} +impl Ord for LogFile { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.timestamp.cmp(&other.timestamp) + } +} impl PartialOrd for LogFile { fn partial_cmp(&self, other: &Self) -> Option { - self.timestamp.partial_cmp(&other.timestamp) + Some(self.cmp(other)) } }