use async_compression::tokio::write::{ZstdDecoder, ZstdEncoder}; use chrono::{DurationRound, TimeDelta, Utc}; use eyre::{format_err, Result}; use log::{debug, error, warn}; use std::path::{Path, PathBuf}; use std::process::Stdio; use tokio::{ fs::File, io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}, process, sync::mpsc, time::{sleep, Duration}, }; use crate::{cgroup, fs}; pub type Timestamp = chrono::DateTime; const TS_FORMAT: &str = "%Y%m%d_%H"; const PREFIX_TS_FORMAT: &str = "%H:%M:%S"; const TRUNC_DELTA: TimeDelta = TimeDelta::hours(1); const FLUSH_INTERVAL: Duration = Duration::from_secs(1); const WRITE_RETRY_DELAY: Duration = Duration::from_secs(1); pub struct Logger<'t> { pub log_path: &'t str, pub log_name: &'t str, pub with_prefix: bool, } impl<'t> Logger<'t> { pub async fn run(&self, cgroup: Option, command: &str, args: &[String]) -> Result<()> { // make sure we can at least open the log before starting the command let archives_path = &format!("{path}/archives", path = self.log_path); (fs::create_dir_all(archives_path).await) .map_err(|e| format_err!("failed to create archives dir: {e}"))?; let archives_read_dir = (fs::read_dir(archives_path).await) .map_err(|e| format_err!("failed to list archives: {e}"))?; let mut prev_stamp = trunc_ts(Utc::now()); let mut current_log = BufWriter::new(self.open_log(prev_stamp).await?); tokio::spawn(compress_archives( archives_read_dir, self.log_name.to_string(), prev_stamp.format(TS_FORMAT).to_string(), )); // start the command let mut cmd = process::Command::new(command); cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped()); if let Some(cgroup) = cgroup.as_deref() { let mut cg_path = PathBuf::from(cgroup::ROOT); cg_path.push(cgroup); cg_path.push(self.log_name); use std::io::ErrorKind as K; match tokio::fs::create_dir(&cg_path).await { Ok(_) => debug!("created dir {}", cg_path.display()), Err(e) if e.kind() == K::AlreadyExists => { debug!("existing dir {}", cg_path.display()) } Err(e) => return Err(fs::Error::CreateDir(cg_path, e).into()), } let procs_file = cg_path.join("cgroup.procs"); debug!("procs file {}", procs_file.display()); fs::write(&procs_file, b"0").await?; } let mut child = cmd.spawn().map_err(|e| format_err!("exec failed: {e}"))?; let (tx, mut rx) = mpsc::channel(8); tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone())); tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx)); // forward signals if let Some(child_pid) = child.id() { forward_signals_to(child_pid as i32); } // handle output let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL); loop { tokio::select!( r = rx.recv() => { let Some(log) = r else { break; }; while let Err(e) = self.log_item(&log, &mut prev_stamp, &mut current_log).await { error!("log failed: {e}"); sleep(WRITE_RETRY_DELAY).await; } } _ = flush_ticker.tick() => { if let Err(e) = current_log.flush().await { warn!("log flush failed: {e}"); } } ); } let status = child.wait().await?; // finalize while let Err(e) = current_log.flush().await { error!("final log flush failed: {e}"); sleep(WRITE_RETRY_DELAY).await; } std::process::exit(status.code().unwrap_or(-1)); } async fn log_item( &self, log: &LogItem, prev_stamp: &mut Timestamp, out: &mut BufWriter, ) -> Result<()> { let trunc_ts = trunc_ts(log.ts); if *prev_stamp < trunc_ts { // switch log out.flush().await?; *out = BufWriter::new(self.open_log(trunc_ts).await?); let prev_path = self.archive_path(*prev_stamp); tokio::spawn(async move { compress(&prev_path).await }); *prev_stamp = trunc_ts; }; if self.with_prefix { let prefix = format!("{} {}| ", log.ts.format(PREFIX_TS_FORMAT), log.stream_name); out.write_all(prefix.as_bytes()).await?; } out.write_all(&log.line).await?; if log.line.last() != Some(&b'\n') { out.write("↵\n".as_bytes()).await?; } Ok(()) } async fn open_log(&self, ts: Timestamp) -> Result { let log_file = &self.archive_path(ts); let file = File::options() .create(true) .append(true) .write(true) .open(log_file) .await?; let link_src = &PathBuf::from(self.log_path) .join(self.log_name) .with_added_extension("log"); let link_tgt = &self.archive_rel_path(ts); let _ = fs::remove_file(link_src).await; fs::symlink(link_tgt, link_src).await.map_err(|e| { format_err!( "symlink {s} -> {t} failed: {e}", s = link_src.display(), t = link_tgt.display() ) })?; Ok(file) } fn archive_path(&self, ts: Timestamp) -> PathBuf { PathBuf::from(self.log_path).join(self.archive_rel_path(ts)) } fn archive_rel_path(&self, ts: Timestamp) -> PathBuf { PathBuf::from("archives").join(self.archive_file(ts)) } fn archive_file(&self, ts: Timestamp) -> String { format!( "{name}.{ts}.log", name = self.log_name, ts = ts.format(TS_FORMAT), ) } } fn forward_signals_to(pid: i32) { use nix::{ sys::signal::{kill, Signal}, unistd::Pid, }; use signal_hook::{consts::*, low_level::register}; debug!("forwarding signals to pid {pid}"); let pid = Pid::from_raw(pid); let signals = [ SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGUSR1, SIGUSR2, SIGPIPE, SIGALRM, ]; for sig in signals { let Ok(signal) = Signal::try_from(sig) else { continue; }; unsafe { register(sig, move || { debug!("forwarding {signal} to {pid}"); let _ = kill(pid, signal); }) .ok(); } } } struct LogItem { stream_name: &'static str, ts: chrono::DateTime, line: Vec, } async fn copy(stream_name: &'static str, out: impl AsyncRead + Unpin, tx: mpsc::Sender) { let buf_size = page_size::get(); let mut out = BufReader::with_capacity(buf_size, out); let mut line = Vec::with_capacity(buf_size); macro_rules! send_line { () => { let log = LogItem { stream_name, ts: chrono::Utc::now(), line: line.clone(), }; if let Err(e) = tx.send(log).await { warn!("send line failed: {e}"); return; } line.clear(); }; } loop { let Ok(buf) = (out.fill_buf()) .await .inspect_err(|e| warn!("read {stream_name} failed: {e}")) else { break; }; if buf.is_empty() { break; } let remaining = buf_size - line.len(); if let Some(pos) = memchr::memchr(b'\n', buf) { let len = pos + 1; let mut buf = &buf[..len]; if len > remaining { line.extend_from_slice(&buf[..remaining]); send_line!(); buf = &buf[remaining..]; } line.extend_from_slice(buf); out.consume(len); send_line!(); } else if buf.len() > remaining { line.extend_from_slice(&buf[..remaining]); out.consume(remaining); send_line!(); } else { line.extend_from_slice(buf); let len = buf.len(); out.consume(len); } } if !line.is_empty() { send_line!(); } } pub fn trunc_ts(ts: Timestamp) -> Timestamp { ts.duration_trunc(TRUNC_DELTA) .expect("duration_trunc failed") } async fn compress_archives(mut read_dir: fs::ReadDir, log_name: String, exclude_ts: String) { loop { let Ok(Some(entry)) = (read_dir.next_entry().await).inspect_err(|e| error!("archive dir read failed: {e}")) else { return; }; let file_name = entry.file_name(); let Some(file_name) = file_name.to_str() else { continue; }; let Some(name) = file_name.strip_suffix(".log") else { continue; }; let Some((name, ts)) = name.rsplit_once('.') else { continue; }; if name != log_name { continue; } if ts == exclude_ts { continue; } let file_path = entry.path(); let Some(file_path) = file_path.to_str() else { continue; }; compress(file_path).await; } } async fn compress(path: impl AsRef) { let path = path.as_ref(); let result = async { let path_str = path.to_string_lossy(); debug!("compressing {path_str}"); let mut input = File::open(path) .await .map_err(|e| format_err!("open {path_str} failed: {e}"))?; let out_path = path.with_extension("zst"); let out = (File::create(&out_path).await) // create output .map_err(|e| format_err!("create {} failed: {e}", out_path.to_string_lossy()))?; let mut out = ZstdEncoder::new(out); async { tokio::io::copy(&mut input, &mut out).await?; out.shutdown().await } .await .map_err(|e| format_err!("compression of {path_str} failed: {e}"))?; fs::remove_file(path) .await .map_err(|e| format_err!("remove {path_str} failed: {e}")) } .await; if let Err(e) = result { warn!("{e}"); } } pub fn parse_ts(ts: &str) -> std::result::Result { let format = &format!("{TS_FORMAT}%M%S"); let full_ts = &format!("{ts}0000"); let dt = chrono::NaiveDateTime::parse_from_str(full_ts, format)?; Ok(Timestamp::from_naive_utc_and_offset(dt, Utc)) } pub async fn log_files(log_path: &str, log_name: &str) -> fs::Result> { let mut dir = PathBuf::from(log_path); dir.push("archives"); let mut entries = Vec::new(); let mut read_dir = fs::read_dir(&dir).await?; while let Some(entry) = (read_dir.next_entry().await).map_err(|e| fs::Error::ReadDir(dir.clone(), e))? { let file_name = entry.file_name(); let Some(file_name) = file_name.to_str() else { continue; }; let Some((name, ext)) = file_name.rsplit_once('.') else { continue; }; let compressed = match ext { "zst" => true, "log" => false, _ => continue, }; let Some((name, timestamp)) = name.rsplit_once('.') else { continue; }; if name != log_name { continue; }; let Ok(timestamp) = parse_ts(timestamp).inspect_err(|e| debug!("invalid timestamp: {timestamp}: {e}")) else { continue; }; entries.push(LogFile { path: entry.path(), compressed, timestamp, }); } Ok(entries) } #[derive(Debug)] pub struct LogFile { pub path: PathBuf, pub compressed: bool, pub timestamp: Timestamp, } impl Eq for LogFile {} impl PartialEq for LogFile { fn eq(&self, other: &Self) -> bool { self.timestamp == other.timestamp } } impl Ord for LogFile { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.timestamp.cmp(&other.timestamp) } } impl PartialOrd for LogFile { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } impl LogFile { pub async fn copy_to(&self, out: &mut (impl AsyncWrite + Unpin)) -> io::Result { let input = &mut File::open(&self.path).await?; if self.compressed { let out = &mut ZstdDecoder::new(out); tokio::io::copy(input, out).await } else { tokio::io::copy(input, out).await } } }