From 4c90b5af71167ffa128fd69dd7557a5948b91731 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mika=C3=ABl=20Cluseau?= Date: Tue, 14 Apr 2026 10:52:36 +0200 Subject: [PATCH] logger: cleanup --- Cargo.lock | 1 + Cargo.toml | 1 + src/logger.rs | 151 +++++++++++++++++++++++++++++++++----------------- 3 files changed, 101 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 46d8b29..0d23e67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -353,6 +353,7 @@ dependencies = [ "human-units", "log", "lz4", + "memchr", "nix", "openssl", "page_size", diff --git a/Cargo.toml b/Cargo.toml index 1bdd6cd..832e3f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ hex = "0.4.3" human-units = "0.5.3" log = "0.4.27" lz4 = "1.28.1" +memchr = "2.8.0" nix = { version = "0.31.2", features = ["process", "signal", "user"] } openssl = "0.10.73" page_size = "0.6.0" diff --git a/src/logger.rs b/src/logger.rs index f66c3c1..9b16618 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -9,7 +9,6 @@ use tokio::{ io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}, process, sync::mpsc, - task::spawn_blocking, time::{sleep, Duration}, }; @@ -61,9 +60,9 @@ impl<'t> Logger<'t> { use std::io::ErrorKind as K; match tokio::fs::create_dir(&cg_path).await { - Ok(_) => log::debug!("created dir {}", cg_path.display()), + Ok(_) => debug!("created dir {}", cg_path.display()), Err(e) if e.kind() == K::AlreadyExists => { - log::debug!("existing dir {}", cg_path.display()) + debug!("existing dir {}", cg_path.display()) } Err(e) => Err(e)?, } @@ -73,27 +72,27 @@ impl<'t> Logger<'t> { }; let control_file = &cg_path.join("cgroup.subtree_control"); - log::debug!("control file {}", control_file.display()); + debug!("control file {}", control_file.display()); tokio::fs::write(control_file, b"+cpu +memory +pids +io").await?; part = next_part; } let procs_file = cg_path.join("cgroup.procs"); - log::debug!("procs file {}", procs_file.display()); + debug!("procs file {}", procs_file.display()); fs::write(&procs_file, b"0").await?; } let mut child = cmd.spawn().map_err(|e| format_err!("exec failed: {e}"))?; - let (tx, mut rx) = mpsc::unbounded_channel(); + let (tx, mut rx) = mpsc::channel(8); tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone())); tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx)); // forward signals if let Some(child_pid) = child.id() { - spawn_blocking(move || forward_signals_to(child_pid as i32)).await?; + forward_signals_to(child_pid as i32); } // handle output @@ -153,6 +152,9 @@ impl<'t> Logger<'t> { } out.write_all(&log.line).await?; + if log.line.last() != Some(&b'\n') { + out.write("↵\n".as_bytes()).await?; + } Ok(()) } @@ -166,29 +168,28 @@ impl<'t> Logger<'t> { .open(log_file) .await?; - let link_src = &format!( - "{path}/{name}.log", - path = self.log_path, - name = self.log_name - ); - let link_tgt = log_file - .strip_prefix(&format!("{}/", self.log_path)) - .unwrap_or(log_file); + let link_src = &PathBuf::from(self.log_path) + .join(self.log_name) + .with_added_extension("log"); + let link_tgt = &self.archive_rel_path(ts); let _ = fs::remove_file(link_src).await; - fs::symlink(link_tgt, link_src) - .await - .map_err(|e| format_err!("symlink {link_src} -> {link_tgt} failed: {e}",))?; + fs::symlink(link_tgt, link_src).await.map_err(|e| { + format_err!( + "symlink {s} -> {t} failed: {e}", + s = link_src.display(), + t = link_tgt.display() + ) + })?; Ok(file) } - fn archive_path(&self, ts: Timestamp) -> String { - format!( - "{path}/archives/{file}", - path = self.log_path, - file = self.archive_file(ts) - ) + fn archive_path(&self, ts: Timestamp) -> PathBuf { + PathBuf::from(self.log_path).join(self.archive_rel_path(ts)) + } + fn archive_rel_path(&self, ts: Timestamp) -> PathBuf { + PathBuf::from("archives").join(self.archive_file(ts)) } fn archive_file(&self, ts: Timestamp) -> String { format!( @@ -206,7 +207,7 @@ fn forward_signals_to(pid: i32) { }; use signal_hook::{consts::*, low_level::register}; - log::debug!("forwarding signals to pid {pid}"); + debug!("forwarding signals to pid {pid}"); let pid = Pid::from_raw(pid); let signals = [ @@ -219,7 +220,7 @@ fn forward_signals_to(pid: i32) { }; unsafe { register(sig, move || { - log::debug!("forwarding {signal} to {pid}"); + debug!("forwarding {signal} to {pid}"); let _ = kill(pid, signal); }) .ok(); @@ -233,33 +234,67 @@ struct LogItem { line: Vec, } -async fn copy( - stream_name: &'static str, - out: impl AsyncRead + Unpin, - tx: mpsc::UnboundedSender, -) { - let mut out = BufReader::new(out); +async fn copy(stream_name: &'static str, out: impl AsyncRead + Unpin, tx: mpsc::Sender) { let buf_size = page_size::get(); + let mut out = BufReader::with_capacity(buf_size, out); + let mut line = Vec::with_capacity(buf_size); + + macro_rules! send_line { + () => { + let log = LogItem { + stream_name, + ts: chrono::Utc::now(), + line: line.clone(), + }; + if let Err(e) = tx.send(log).await { + warn!("send line failed: {e}"); + return; + } + line.clear(); + }; + } + loop { - let mut line = Vec::with_capacity(buf_size); - if let Err(e) = out.read_until(b'\n', &mut line).await { - warn!("read {stream_name} failed: {e}"); - return; - } - if line.is_empty() { + let Ok(buf) = (out.fill_buf()) + .await + .inspect_err(|e| warn!("read {stream_name} failed: {e}")) + else { + break; + }; + + if buf.is_empty() { break; } - let log = LogItem { - stream_name, - ts: chrono::Utc::now(), - line, - }; - if let Err(e) = tx.send(log) { - warn!("send line failed: {e}"); - return; + let remaining = buf_size - line.len(); + + if let Some(pos) = memchr::memchr(b'\n', buf) { + let len = pos + 1; + let mut buf = &buf[..len]; + + if len > remaining { + line.extend_from_slice(&buf[..remaining]); + send_line!(); + buf = &buf[remaining..]; + } + + line.extend_from_slice(buf); + out.consume(len); + send_line!(); + } else if buf.len() > remaining { + line.extend_from_slice(&buf[..remaining]); + out.consume(remaining); + send_line!(); + } else { + line.extend_from_slice(buf); + let len = buf.len(); + out.consume(len); } } + + if !line.is_empty() { + send_line!(); + } } pub fn trunc_ts(ts: Timestamp) -> Timestamp { @@ -319,14 +354,14 @@ async fn compress(path: impl AsRef) { let mut out = ZstdEncoder::new(out); async { tokio::io::copy(&mut input, &mut out).await?; - out.flush().await + out.shutdown().await } .await .map_err(|e| format_err!("compression of {path_str} failed: {e}"))?; fs::remove_file(path) .await - .map_err(|e| format_err!("remove {path_str} failed: {e}")) + .map_err(|e| format_err!("remove {path_str} failed: {e}")) } .await; @@ -336,8 +371,9 @@ async fn compress(path: impl AsRef) { } pub fn parse_ts(ts: &str) -> std::result::Result { - let dt = - chrono::NaiveDateTime::parse_from_str(&format!("{ts}0000"), &format!("{TS_FORMAT}%M%S"))?; + let format = &format!("{TS_FORMAT}%M%S"); + let full_ts = &format!("{ts}0000"); + let dt = chrono::NaiveDateTime::parse_from_str(full_ts, format)?; Ok(Timestamp::from_naive_utc_and_offset(dt, Utc)) } @@ -389,16 +425,27 @@ pub async fn log_files(log_path: &str, log_name: &str) -> fs::Result bool { + self.timestamp == other.timestamp + } +} +impl Ord for LogFile { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.timestamp.cmp(&other.timestamp) + } +} impl PartialOrd for LogFile { fn partial_cmp(&self, other: &Self) -> Option { - self.timestamp.partial_cmp(&other.timestamp) + Some(self.cmp(other)) } }