2025-07-20 18:48:47 +02:00
|
|
|
|
use async_compression::tokio::write::{ZstdDecoder, ZstdEncoder};
|
|
|
|
|
|
use chrono::{DurationRound, TimeDelta, Utc};
|
2026-02-21 18:15:39 +01:00
|
|
|
|
use eyre::{Result, format_err};
|
2025-07-20 18:48:47 +02:00
|
|
|
|
use log::{debug, error, warn};
|
|
|
|
|
|
use std::path::{Path, PathBuf};
|
|
|
|
|
|
use std::process::Stdio;
|
|
|
|
|
|
use tokio::{
|
2026-04-13 21:11:07 +02:00
|
|
|
|
fs::File,
|
2025-07-20 18:48:47 +02:00
|
|
|
|
io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter},
|
|
|
|
|
|
process,
|
|
|
|
|
|
sync::mpsc,
|
2026-02-21 18:15:39 +01:00
|
|
|
|
time::{Duration, sleep},
|
2025-07-20 18:48:47 +02:00
|
|
|
|
};
|
|
|
|
|
|
|
2026-04-13 21:11:07 +02:00
|
|
|
|
use crate::{cgroup, fs};
|
|
|
|
|
|
|
2025-07-20 18:48:47 +02:00
|
|
|
|
pub type Timestamp = chrono::DateTime<Utc>;
|
|
|
|
|
|
|
|
|
|
|
|
const TS_FORMAT: &str = "%Y%m%d_%H";
|
|
|
|
|
|
const PREFIX_TS_FORMAT: &str = "%H:%M:%S";
|
|
|
|
|
|
const TRUNC_DELTA: TimeDelta = TimeDelta::hours(1);
|
|
|
|
|
|
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
|
|
|
|
|
|
const WRITE_RETRY_DELAY: Duration = Duration::from_secs(1);
|
|
|
|
|
|
|
|
|
|
|
|
pub struct Logger<'t> {
|
|
|
|
|
|
pub log_path: &'t str,
|
|
|
|
|
|
pub log_name: &'t str,
|
|
|
|
|
|
pub with_prefix: bool,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
impl<'t> Logger<'t> {
|
2026-04-13 21:11:07 +02:00
|
|
|
|
pub async fn run(&self, cgroup: Option<String>, command: &str, args: &[String]) -> Result<()> {
|
2025-07-20 18:48:47 +02:00
|
|
|
|
// make sure we can at least open the log before starting the command
|
|
|
|
|
|
let archives_path = &format!("{path}/archives", path = self.log_path);
|
|
|
|
|
|
(fs::create_dir_all(archives_path).await)
|
|
|
|
|
|
.map_err(|e| format_err!("failed to create archives dir: {e}"))?;
|
|
|
|
|
|
let archives_read_dir = (fs::read_dir(archives_path).await)
|
|
|
|
|
|
.map_err(|e| format_err!("failed to list archives: {e}"))?;
|
|
|
|
|
|
|
2025-07-20 22:30:10 +02:00
|
|
|
|
let mut prev_stamp = trunc_ts(Utc::now());
|
2025-07-20 18:48:47 +02:00
|
|
|
|
let mut current_log = BufWriter::new(self.open_log(prev_stamp).await?);
|
|
|
|
|
|
|
|
|
|
|
|
tokio::spawn(compress_archives(
|
|
|
|
|
|
archives_read_dir,
|
|
|
|
|
|
self.log_name.to_string(),
|
|
|
|
|
|
prev_stamp.format(TS_FORMAT).to_string(),
|
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
|
|
// start the command
|
2026-04-13 21:11:07 +02:00
|
|
|
|
let mut cmd = process::Command::new(command);
|
|
|
|
|
|
cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped());
|
|
|
|
|
|
if let Some(cgroup) = cgroup.as_deref() {
|
|
|
|
|
|
let mut cg_path = PathBuf::from(cgroup::ROOT);
|
|
|
|
|
|
|
|
|
|
|
|
let mut parts = cgroup.split('/').chain([self.log_name].into_iter());
|
|
|
|
|
|
let mut part = parts.next().unwrap(); // 1 element guaranteed
|
|
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
|
cg_path.push(part);
|
|
|
|
|
|
|
|
|
|
|
|
use std::io::ErrorKind as K;
|
|
|
|
|
|
match tokio::fs::create_dir(&cg_path).await {
|
|
|
|
|
|
Ok(_) => log::debug!("created dir {}", cg_path.display()),
|
|
|
|
|
|
Err(e) if e.kind() == K::AlreadyExists => {
|
|
|
|
|
|
log::debug!("existing dir {}", cg_path.display())
|
|
|
|
|
|
}
|
|
|
|
|
|
Err(e) => Err(e)?,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
let Some(next_part) = parts.next() else {
|
|
|
|
|
|
break;
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
let control_file = &cg_path.join("cgroup.subtree_control");
|
|
|
|
|
|
log::debug!("control file {}", control_file.display());
|
|
|
|
|
|
tokio::fs::write(control_file, b"+cpu +memory +pids +io").await?;
|
|
|
|
|
|
|
|
|
|
|
|
part = next_part;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
let procs_file = cg_path.join("cgroup.procs");
|
|
|
|
|
|
log::debug!("procs file {}", procs_file.display());
|
|
|
|
|
|
fs::write(&procs_file, b"0").await?;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
let mut child = cmd.spawn().map_err(|e| format_err!("exec failed: {e}"))?;
|
2025-07-20 18:48:47 +02:00
|
|
|
|
|
|
|
|
|
|
let (tx, mut rx) = mpsc::unbounded_channel();
|
|
|
|
|
|
|
|
|
|
|
|
tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone()));
|
|
|
|
|
|
tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx));
|
|
|
|
|
|
|
2026-04-13 21:11:07 +02:00
|
|
|
|
// forward signals
|
|
|
|
|
|
if let Some(child_pid) = child.id() {
|
|
|
|
|
|
std::thread::spawn(move || forward_signals_to(child_pid as i32));
|
|
|
|
|
|
}
|
2025-07-20 18:48:47 +02:00
|
|
|
|
|
|
|
|
|
|
// handle output
|
2026-04-13 21:11:07 +02:00
|
|
|
|
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
|
|
|
|
|
|
|
2025-07-20 18:48:47 +02:00
|
|
|
|
loop {
|
|
|
|
|
|
tokio::select!(
|
|
|
|
|
|
r = rx.recv() => {
|
|
|
|
|
|
let Some(log) = r else { break; };
|
|
|
|
|
|
|
|
|
|
|
|
while let Err(e) = self.log_item(&log, &mut prev_stamp, &mut current_log).await
|
|
|
|
|
|
{
|
|
|
|
|
|
error!("log failed: {e}");
|
|
|
|
|
|
sleep(WRITE_RETRY_DELAY).await;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
_ = flush_ticker.tick() => {
|
|
|
|
|
|
if let Err(e) = current_log.flush().await {
|
|
|
|
|
|
warn!("log flush failed: {e}");
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-13 21:11:07 +02:00
|
|
|
|
let status = child.wait().await?;
|
|
|
|
|
|
|
2025-07-20 18:48:47 +02:00
|
|
|
|
// finalize
|
|
|
|
|
|
while let Err(e) = current_log.flush().await {
|
|
|
|
|
|
error!("final log flush failed: {e}");
|
|
|
|
|
|
sleep(WRITE_RETRY_DELAY).await;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
std::process::exit(status.code().unwrap_or(-1));
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
async fn log_item(
|
|
|
|
|
|
&self,
|
|
|
|
|
|
log: &LogItem,
|
|
|
|
|
|
prev_stamp: &mut Timestamp,
|
|
|
|
|
|
out: &mut BufWriter<File>,
|
|
|
|
|
|
) -> Result<()> {
|
2025-07-20 22:30:10 +02:00
|
|
|
|
let trunc_ts = trunc_ts(log.ts);
|
2025-07-20 18:48:47 +02:00
|
|
|
|
if *prev_stamp < trunc_ts {
|
|
|
|
|
|
// switch log
|
|
|
|
|
|
out.flush().await?;
|
|
|
|
|
|
*out = BufWriter::new(self.open_log(trunc_ts).await?);
|
|
|
|
|
|
|
|
|
|
|
|
let prev_path = self.archive_path(*prev_stamp);
|
|
|
|
|
|
tokio::spawn(async move { compress(&prev_path).await });
|
|
|
|
|
|
|
|
|
|
|
|
*prev_stamp = trunc_ts;
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
if self.with_prefix {
|
|
|
|
|
|
let prefix = format!("{} {}| ", log.ts.format(PREFIX_TS_FORMAT), log.stream_name);
|
|
|
|
|
|
out.write_all(prefix.as_bytes()).await?;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
out.write_all(&log.line).await?;
|
|
|
|
|
|
Ok(())
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
async fn open_log(&self, ts: Timestamp) -> Result<File> {
|
|
|
|
|
|
let log_file = &self.archive_path(ts);
|
|
|
|
|
|
|
|
|
|
|
|
let file = File::options()
|
|
|
|
|
|
.create(true)
|
|
|
|
|
|
.append(true)
|
|
|
|
|
|
.write(true)
|
|
|
|
|
|
.open(log_file)
|
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
|
|
|
let link_src = &format!(
|
|
|
|
|
|
"{path}/{name}.log",
|
|
|
|
|
|
path = self.log_path,
|
|
|
|
|
|
name = self.log_name
|
|
|
|
|
|
);
|
|
|
|
|
|
let link_tgt = log_file
|
|
|
|
|
|
.strip_prefix(&format!("{}/", self.log_path))
|
|
|
|
|
|
.unwrap_or(log_file);
|
|
|
|
|
|
|
|
|
|
|
|
let _ = fs::remove_file(link_src).await;
|
|
|
|
|
|
fs::symlink(link_tgt, link_src)
|
|
|
|
|
|
.await
|
|
|
|
|
|
.map_err(|e| format_err!("symlink {link_src} -> {link_tgt} failed: {e}",))?;
|
|
|
|
|
|
|
|
|
|
|
|
Ok(file)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
fn archive_path(&self, ts: Timestamp) -> String {
|
|
|
|
|
|
format!(
|
|
|
|
|
|
"{path}/archives/{file}",
|
|
|
|
|
|
path = self.log_path,
|
|
|
|
|
|
file = self.archive_file(ts)
|
|
|
|
|
|
)
|
|
|
|
|
|
}
|
|
|
|
|
|
fn archive_file(&self, ts: Timestamp) -> String {
|
|
|
|
|
|
format!(
|
|
|
|
|
|
"{name}.{ts}.log",
|
|
|
|
|
|
name = self.log_name,
|
|
|
|
|
|
ts = ts.format(TS_FORMAT),
|
|
|
|
|
|
)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-13 21:11:07 +02:00
|
|
|
|
fn forward_signals_to(pid: i32) {
|
|
|
|
|
|
use nix::{
|
|
|
|
|
|
sys::signal::{Signal, kill},
|
|
|
|
|
|
unistd::Pid,
|
|
|
|
|
|
};
|
|
|
|
|
|
use signal_hook::{consts::*, iterator::Signals};
|
|
|
|
|
|
|
|
|
|
|
|
log::debug!("forwarding signals to pid {pid}");
|
|
|
|
|
|
|
|
|
|
|
|
let mut signals = Signals::new([
|
|
|
|
|
|
SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGUSR1, SIGUSR2, SIGPIPE, SIGALRM,
|
|
|
|
|
|
])
|
|
|
|
|
|
.expect("signal setup failed");
|
|
|
|
|
|
|
|
|
|
|
|
let pid = Pid::from_raw(pid);
|
|
|
|
|
|
for signal in &mut signals {
|
|
|
|
|
|
let Ok(signal) = Signal::try_from(signal) else {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
};
|
|
|
|
|
|
let _ = kill(pid, signal);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-07-20 18:48:47 +02:00
|
|
|
|
struct LogItem {
|
|
|
|
|
|
stream_name: &'static str,
|
|
|
|
|
|
ts: chrono::DateTime<chrono::Utc>,
|
|
|
|
|
|
line: Vec<u8>,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
async fn copy(
|
|
|
|
|
|
stream_name: &'static str,
|
|
|
|
|
|
out: impl AsyncRead + Unpin,
|
|
|
|
|
|
tx: mpsc::UnboundedSender<LogItem>,
|
|
|
|
|
|
) {
|
|
|
|
|
|
let mut out = BufReader::new(out);
|
|
|
|
|
|
let buf_size = page_size::get();
|
|
|
|
|
|
loop {
|
|
|
|
|
|
let mut line = Vec::with_capacity(buf_size);
|
|
|
|
|
|
if let Err(e) = out.read_until(b'\n', &mut line).await {
|
|
|
|
|
|
warn!("read {stream_name} failed: {e}");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
if line.is_empty() {
|
|
|
|
|
|
break;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
let log = LogItem {
|
|
|
|
|
|
stream_name,
|
|
|
|
|
|
ts: chrono::Utc::now(),
|
|
|
|
|
|
line,
|
|
|
|
|
|
};
|
|
|
|
|
|
if let Err(e) = tx.send(log) {
|
|
|
|
|
|
warn!("send line failed: {e}");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-07-20 22:30:10 +02:00
|
|
|
|
pub fn trunc_ts(ts: Timestamp) -> Timestamp {
|
2025-07-20 18:48:47 +02:00
|
|
|
|
ts.duration_trunc(TRUNC_DELTA)
|
|
|
|
|
|
.expect("duration_trunc failed")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
async fn compress_archives(mut read_dir: fs::ReadDir, log_name: String, exclude_ts: String) {
|
|
|
|
|
|
loop {
|
|
|
|
|
|
let Ok(Some(entry)) =
|
|
|
|
|
|
(read_dir.next_entry().await).inspect_err(|e| error!("archive dir read failed: {e}"))
|
|
|
|
|
|
else {
|
|
|
|
|
|
return;
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
let file_name = entry.file_name();
|
|
|
|
|
|
let Some(file_name) = file_name.to_str() else {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
};
|
|
|
|
|
|
let Some(name) = file_name.strip_suffix(".log") else {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
};
|
|
|
|
|
|
let Some((name, ts)) = name.rsplit_once('.') else {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
};
|
|
|
|
|
|
if name != log_name {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
if ts == exclude_ts {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
let file_path = entry.path();
|
|
|
|
|
|
let Some(file_path) = file_path.to_str() else {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
compress(file_path).await;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
async fn compress(path: impl AsRef<Path>) {
|
|
|
|
|
|
let path = path.as_ref();
|
|
|
|
|
|
|
|
|
|
|
|
let result = async {
|
|
|
|
|
|
let path_str = path.to_string_lossy();
|
|
|
|
|
|
debug!("compressing {path_str}");
|
|
|
|
|
|
|
|
|
|
|
|
let mut input = File::open(path)
|
|
|
|
|
|
.await
|
|
|
|
|
|
.map_err(|e| format_err!("open {path_str} failed: {e}"))?;
|
|
|
|
|
|
|
2025-07-21 09:49:02 +02:00
|
|
|
|
let out_path = path.with_extension("zst");
|
2025-07-20 18:48:47 +02:00
|
|
|
|
let out = (File::create(&out_path).await) // create output
|
|
|
|
|
|
.map_err(|e| format_err!("create {} failed: {e}", out_path.to_string_lossy()))?;
|
|
|
|
|
|
|
|
|
|
|
|
let mut out = ZstdEncoder::new(out);
|
|
|
|
|
|
async {
|
|
|
|
|
|
tokio::io::copy(&mut input, &mut out).await?;
|
|
|
|
|
|
out.flush().await
|
|
|
|
|
|
}
|
|
|
|
|
|
.await
|
|
|
|
|
|
.map_err(|e| format_err!("compression of {path_str} failed: {e}"))?;
|
|
|
|
|
|
|
|
|
|
|
|
fs::remove_file(path)
|
|
|
|
|
|
.await
|
|
|
|
|
|
.map_err(|e| format_err!("remove {path_str} failed: {e}"))
|
|
|
|
|
|
}
|
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
|
|
if let Err(e) = result {
|
|
|
|
|
|
warn!("{e}");
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
pub fn parse_ts(ts: &str) -> std::result::Result<Timestamp, chrono::ParseError> {
|
|
|
|
|
|
let dt =
|
|
|
|
|
|
chrono::NaiveDateTime::parse_from_str(&format!("{ts}0000"), &format!("{TS_FORMAT}%M%S"))?;
|
|
|
|
|
|
Ok(Timestamp::from_naive_utc_and_offset(dt, Utc))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-13 21:11:07 +02:00
|
|
|
|
pub async fn log_files(log_path: &str, log_name: &str) -> fs::Result<Vec<LogFile>> {
|
2025-07-20 18:48:47 +02:00
|
|
|
|
let mut dir = PathBuf::from(log_path);
|
|
|
|
|
|
dir.push("archives");
|
|
|
|
|
|
|
|
|
|
|
|
let mut entries = Vec::new();
|
2026-04-13 21:11:07 +02:00
|
|
|
|
let mut read_dir = fs::read_dir(&dir).await?;
|
2025-07-20 18:48:47 +02:00
|
|
|
|
|
2026-04-13 21:11:07 +02:00
|
|
|
|
while let Some(entry) =
|
|
|
|
|
|
(read_dir.next_entry().await).map_err(|e| fs::Error::ReadDir(dir.clone(), e))?
|
|
|
|
|
|
{
|
2025-07-20 18:48:47 +02:00
|
|
|
|
let file_name = entry.file_name();
|
|
|
|
|
|
let Some(file_name) = file_name.to_str() else {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
};
|
|
|
|
|
|
|
2025-07-21 10:11:05 +02:00
|
|
|
|
let Some((name, ext)) = file_name.rsplit_once('.') else {
|
2025-07-20 18:48:47 +02:00
|
|
|
|
continue;
|
|
|
|
|
|
};
|
|
|
|
|
|
|
2025-07-21 10:11:05 +02:00
|
|
|
|
let compressed = match ext {
|
|
|
|
|
|
"zst" => true,
|
|
|
|
|
|
"log" => false,
|
|
|
|
|
|
_ => continue,
|
|
|
|
|
|
};
|
|
|
|
|
|
|
2025-07-20 18:48:47 +02:00
|
|
|
|
let Some((name, timestamp)) = name.rsplit_once('.') else {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
};
|
|
|
|
|
|
if name != log_name {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
let Ok(timestamp) =
|
|
|
|
|
|
parse_ts(timestamp).inspect_err(|e| debug!("invalid timestamp: {timestamp}: {e}"))
|
|
|
|
|
|
else {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
entries.push(LogFile {
|
|
|
|
|
|
path: entry.path(),
|
|
|
|
|
|
compressed,
|
|
|
|
|
|
timestamp,
|
|
|
|
|
|
});
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
Ok(entries)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, PartialEq, Eq, Ord)]
|
|
|
|
|
|
pub struct LogFile {
|
|
|
|
|
|
pub path: PathBuf,
|
|
|
|
|
|
pub compressed: bool,
|
|
|
|
|
|
pub timestamp: Timestamp,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
impl PartialOrd for LogFile {
|
|
|
|
|
|
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
|
|
|
|
|
self.timestamp.partial_cmp(&other.timestamp)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
impl LogFile {
|
|
|
|
|
|
pub async fn copy_to(&self, out: &mut (impl AsyncWrite + Unpin)) -> io::Result<u64> {
|
2025-07-20 22:30:10 +02:00
|
|
|
|
let input = &mut File::open(&self.path).await?;
|
2025-07-20 18:48:47 +02:00
|
|
|
|
if self.compressed {
|
2025-07-20 22:30:10 +02:00
|
|
|
|
let out = &mut ZstdDecoder::new(out);
|
|
|
|
|
|
tokio::io::copy(input, out).await
|
2025-07-20 18:48:47 +02:00
|
|
|
|
} else {
|
2025-07-20 22:30:10 +02:00
|
|
|
|
tokio::io::copy(input, out).await
|
2025-07-20 18:48:47 +02:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|