use async_compression::tokio::write::{ZstdDecoder, ZstdEncoder}; use chrono::{DurationRound, TimeDelta, Utc}; use eyre::{format_err, Result}; use log::{debug, error, warn}; use std::path::{Path, PathBuf}; use std::process::Stdio; use tokio::{ fs::{self, File}, io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}, process, sync::mpsc, time::{sleep, Duration}, }; pub type Timestamp = chrono::DateTime; const TS_FORMAT: &str = "%Y%m%d_%H"; const PREFIX_TS_FORMAT: &str = "%H:%M:%S"; const TRUNC_DELTA: TimeDelta = TimeDelta::hours(1); const FLUSH_INTERVAL: Duration = Duration::from_secs(1); const WRITE_RETRY_DELAY: Duration = Duration::from_secs(1); pub struct Logger<'t> { pub log_path: &'t str, pub log_name: &'t str, pub with_prefix: bool, } impl<'t> Logger<'t> { pub async fn run(&self, command: &str, args: &[String]) -> Result<()> { // make sure we can at least open the log before starting the command let archives_path = &format!("{path}/archives", path = self.log_path); (fs::create_dir_all(archives_path).await) .map_err(|e| format_err!("failed to create archives dir: {e}"))?; let archives_read_dir = (fs::read_dir(archives_path).await) .map_err(|e| format_err!("failed to list archives: {e}"))?; let mut prev_stamp = trunc_ts(Utc::now()); let mut current_log = BufWriter::new(self.open_log(prev_stamp).await?); tokio::spawn(compress_archives( archives_read_dir, self.log_name.to_string(), prev_stamp.format(TS_FORMAT).to_string(), )); // start the command let mut child = process::Command::new(command) .args(args) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn()?; let (tx, mut rx) = mpsc::unbounded_channel(); tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone())); tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx)); let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL); // handle output loop { tokio::select!( r = rx.recv() => { let Some(log) = r else { break; }; while let Err(e) = self.log_item(&log, &mut prev_stamp, &mut current_log).await { error!("log failed: {e}"); sleep(WRITE_RETRY_DELAY).await; } } _ = flush_ticker.tick() => { if let Err(e) = current_log.flush().await { warn!("log flush failed: {e}"); } } ); } // finalize while let Err(e) = current_log.flush().await { error!("final log flush failed: {e}"); sleep(WRITE_RETRY_DELAY).await; } let status = child.wait().await?; std::process::exit(status.code().unwrap_or(-1)); } async fn log_item( &self, log: &LogItem, prev_stamp: &mut Timestamp, out: &mut BufWriter, ) -> Result<()> { let trunc_ts = trunc_ts(log.ts); if *prev_stamp < trunc_ts { // switch log out.flush().await?; *out = BufWriter::new(self.open_log(trunc_ts).await?); let prev_path = self.archive_path(*prev_stamp); tokio::spawn(async move { compress(&prev_path).await }); *prev_stamp = trunc_ts; }; if self.with_prefix { let prefix = format!("{} {}| ", log.ts.format(PREFIX_TS_FORMAT), log.stream_name); out.write_all(prefix.as_bytes()).await?; } out.write_all(&log.line).await?; Ok(()) } async fn open_log(&self, ts: Timestamp) -> Result { let log_file = &self.archive_path(ts); let file = File::options() .create(true) .append(true) .write(true) .open(log_file) .await?; let link_src = &format!( "{path}/{name}.log", path = self.log_path, name = self.log_name ); let link_tgt = log_file .strip_prefix(&format!("{}/", self.log_path)) .unwrap_or(log_file); let _ = fs::remove_file(link_src).await; fs::symlink(link_tgt, link_src) .await .map_err(|e| format_err!("symlink {link_src} -> {link_tgt} failed: {e}",))?; Ok(file) } fn archive_path(&self, ts: Timestamp) -> String { format!( "{path}/archives/{file}", path = self.log_path, file = self.archive_file(ts) ) } fn archive_file(&self, ts: Timestamp) -> String { format!( "{name}.{ts}.log", name = self.log_name, ts = ts.format(TS_FORMAT), ) } } struct LogItem { stream_name: &'static str, ts: chrono::DateTime, line: Vec, } async fn copy( stream_name: &'static str, out: impl AsyncRead + Unpin, tx: mpsc::UnboundedSender, ) { let mut out = BufReader::new(out); let buf_size = page_size::get(); loop { let mut line = Vec::with_capacity(buf_size); if let Err(e) = out.read_until(b'\n', &mut line).await { warn!("read {stream_name} failed: {e}"); return; } if line.is_empty() { break; } let log = LogItem { stream_name, ts: chrono::Utc::now(), line, }; if let Err(e) = tx.send(log) { warn!("send line failed: {e}"); return; } } } pub fn trunc_ts(ts: Timestamp) -> Timestamp { ts.duration_trunc(TRUNC_DELTA) .expect("duration_trunc failed") } async fn compress_archives(mut read_dir: fs::ReadDir, log_name: String, exclude_ts: String) { loop { let Ok(Some(entry)) = (read_dir.next_entry().await).inspect_err(|e| error!("archive dir read failed: {e}")) else { return; }; let file_name = entry.file_name(); let Some(file_name) = file_name.to_str() else { continue; }; let Some(name) = file_name.strip_suffix(".log") else { continue; }; let Some((name, ts)) = name.rsplit_once('.') else { continue; }; if name != log_name { continue; } if ts == exclude_ts { continue; } let file_path = entry.path(); let Some(file_path) = file_path.to_str() else { continue; }; compress(file_path).await; } } async fn compress(path: impl AsRef) { let path = path.as_ref(); let result = async { let path_str = path.to_string_lossy(); debug!("compressing {path_str}"); let mut input = File::open(path) .await .map_err(|e| format_err!("open {path_str} failed: {e}"))?; let out_path = path.with_extension("zstd"); let out = (File::create(&out_path).await) // create output .map_err(|e| format_err!("create {} failed: {e}", out_path.to_string_lossy()))?; let mut out = ZstdEncoder::new(out); async { tokio::io::copy(&mut input, &mut out).await?; out.flush().await } .await .map_err(|e| format_err!("compression of {path_str} failed: {e}"))?; fs::remove_file(path) .await .map_err(|e| format_err!("remove {path_str} failed: {e}")) } .await; if let Err(e) = result { warn!("{e}"); } } pub fn parse_ts(ts: &str) -> std::result::Result { let dt = chrono::NaiveDateTime::parse_from_str(&format!("{ts}0000"), &format!("{TS_FORMAT}%M%S"))?; Ok(Timestamp::from_naive_utc_and_offset(dt, Utc)) } pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result> { let mut dir = PathBuf::from(log_path); dir.push("archives"); let mut entries = Vec::new(); let mut read_dir = fs::read_dir(dir).await?; while let Some(entry) = read_dir.next_entry().await? { let file_name = entry.file_name(); let Some(file_name) = file_name.to_str() else { continue; }; let (name, compressed) = file_name .strip_suffix(".zstd") .map_or((file_name, false), |s| (s, true)); let Some(name) = name.strip_suffix(".log") else { continue; }; let Some((name, timestamp)) = name.rsplit_once('.') else { continue; }; if name != log_name { continue; }; let Ok(timestamp) = parse_ts(timestamp).inspect_err(|e| debug!("invalid timestamp: {timestamp}: {e}")) else { continue; }; entries.push(LogFile { path: entry.path(), compressed, timestamp, }); } Ok(entries) } #[derive(Debug, PartialEq, Eq, Ord)] pub struct LogFile { pub path: PathBuf, pub compressed: bool, pub timestamp: Timestamp, } impl PartialOrd for LogFile { fn partial_cmp(&self, other: &Self) -> Option { self.timestamp.partial_cmp(&other.timestamp) } } impl LogFile { pub async fn copy_to(&self, out: &mut (impl AsyncWrite + Unpin)) -> io::Result { let input = &mut File::open(&self.path).await?; if self.compressed { let out = &mut ZstdDecoder::new(out); tokio::io::copy(input, out).await } else { tokio::io::copy(input, out).await } } }