Files
dkl/src/logger.rs
T

486 lines
13 KiB
Rust
Raw Normal View History

2025-07-20 18:48:47 +02:00
use async_compression::tokio::write::{ZstdDecoder, ZstdEncoder};
use chrono::{DurationRound, TimeDelta, Utc};
use eyre::{format_err, Result};
2025-07-20 18:48:47 +02:00
use log::{debug, error, warn};
2026-04-16 11:53:37 +02:00
use std::ffi::OsStr;
2025-07-20 18:48:47 +02:00
use std::path::{Path, PathBuf};
use std::process::Stdio;
use tokio::{
2026-04-13 21:11:07 +02:00
fs::File,
2025-07-20 18:48:47 +02:00
io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter},
2026-04-16 11:53:37 +02:00
process::{Child, Command},
2025-07-20 18:48:47 +02:00
sync::mpsc,
time::{sleep, Duration},
2025-07-20 18:48:47 +02:00
};
2026-04-13 21:11:07 +02:00
use crate::{cgroup, fs};
2025-07-20 18:48:47 +02:00
pub type Timestamp = chrono::DateTime<Utc>;
const TS_FORMAT: &str = "%Y%m%d_%H";
const PREFIX_TS_FORMAT: &str = "%H:%M:%S";
const TRUNC_DELTA: TimeDelta = TimeDelta::hours(1);
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
const WRITE_RETRY_DELAY: Duration = Duration::from_secs(1);
2026-04-16 11:53:37 +02:00
pub struct Logger {
pub log_path: PathBuf,
pub log_name: PathBuf,
2025-07-20 18:48:47 +02:00
pub with_prefix: bool,
2026-04-16 11:53:37 +02:00
pub cgroup: Option<String>,
2025-07-20 18:48:47 +02:00
}
2026-04-16 11:53:37 +02:00
impl Logger {
pub async fn setup<I, S>(&self, command: impl AsRef<OsStr>, args: I) -> fs::Result<Command>
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
2025-07-20 18:48:47 +02:00
// make sure we can at least open the log before starting the command
2026-04-16 11:53:37 +02:00
let archives_path = &self.log_path.join("archives");
fs::create_dir_all(archives_path).await?;
2025-07-20 18:48:47 +02:00
2026-04-16 11:53:37 +02:00
let archives_read_dir = fs::read_dir(archives_path).await?;
let prev_stamp = trunc_ts(Utc::now());
2025-07-20 18:48:47 +02:00
tokio::spawn(compress_archives(
archives_read_dir,
2026-04-16 11:53:37 +02:00
self.log_name.clone(),
2025-07-20 18:48:47 +02:00
prev_stamp.format(TS_FORMAT).to_string(),
));
2026-04-16 11:53:37 +02:00
// create the command
let mut cmd = Command::new(command);
cmd.args(args);
if let Some(cgroup) = self.cgroup.as_deref() {
let cg_path = PathBuf::from(cgroup::ROOT)
.join(cgroup)
.join(&self.log_name);
fs::create_dir_all(&cg_path).await?;
2026-04-13 21:11:07 +02:00
let procs_file = cg_path.join("cgroup.procs");
2026-04-14 10:52:36 +02:00
debug!("procs file {}", procs_file.display());
2026-04-16 11:53:37 +02:00
unsafe { cmd.pre_exec(move || std::fs::write(&procs_file, b"0")) };
2026-04-13 21:11:07 +02:00
}
2026-04-16 11:53:37 +02:00
Ok(cmd)
}
pub fn spawn(self, mut cmd: Command) -> std::io::Result<Child> {
// setup outputs for capture
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
2025-07-20 18:48:47 +02:00
2026-04-16 11:53:37 +02:00
// spawn
let mut child = cmd.spawn()?;
// capture outputs
let (tx, rx) = mpsc::channel(8);
2025-07-20 18:48:47 +02:00
tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone()));
tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx));
2026-04-16 11:53:37 +02:00
// log outputs
tokio::spawn(self.log_stream(rx));
Ok(child)
}
// TODO: Result<!> when stable
pub async fn exec(self, cmd: Command) -> Result<()> {
let mut child = self.spawn(cmd)?;
2026-04-13 21:11:07 +02:00
// forward signals
if let Some(child_pid) = child.id() {
2026-04-14 10:52:36 +02:00
forward_signals_to(child_pid as i32);
2026-04-13 21:11:07 +02:00
}
2025-07-20 18:48:47 +02:00
2026-04-16 11:53:37 +02:00
let status = child.wait().await?;
std::process::exit(status.code().unwrap_or(-1));
}
async fn log_stream(self, mut rx: mpsc::Receiver<LogItem>) {
2026-04-13 21:11:07 +02:00
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
2026-04-16 11:53:37 +02:00
let mut prev_stamp = trunc_ts(Utc::now());
let mut current_log = BufWriter::new(self.eventually_open_log(prev_stamp).await);
2025-07-20 18:48:47 +02:00
loop {
tokio::select!(
r = rx.recv() => {
let Some(log) = r else { break; };
while let Err(e) = self.log_item(&log, &mut prev_stamp, &mut current_log).await
{
error!("log failed: {e}");
sleep(WRITE_RETRY_DELAY).await;
}
}
_ = flush_ticker.tick() => {
if let Err(e) = current_log.flush().await {
warn!("log flush failed: {e}");
}
}
);
}
// finalize
2026-04-16 11:53:37 +02:00
while let Err(e) = current_log.shutdown().await {
error!("final log shutdown failed: {e}");
2025-07-20 18:48:47 +02:00
sleep(WRITE_RETRY_DELAY).await;
}
}
async fn log_item(
&self,
log: &LogItem,
prev_stamp: &mut Timestamp,
out: &mut BufWriter<File>,
) -> Result<()> {
2025-07-20 22:30:10 +02:00
let trunc_ts = trunc_ts(log.ts);
2025-07-20 18:48:47 +02:00
if *prev_stamp < trunc_ts {
// switch log
out.flush().await?;
*out = BufWriter::new(self.open_log(trunc_ts).await?);
let prev_path = self.archive_path(*prev_stamp);
tokio::spawn(async move { compress(&prev_path).await });
*prev_stamp = trunc_ts;
};
if self.with_prefix {
let prefix = format!("{} {}| ", log.ts.format(PREFIX_TS_FORMAT), log.stream_name);
out.write_all(prefix.as_bytes()).await?;
}
out.write_all(&log.line).await?;
2026-04-14 10:52:36 +02:00
if log.line.last() != Some(&b'\n') {
out.write("\n".as_bytes()).await?;
}
2025-07-20 18:48:47 +02:00
Ok(())
}
2026-04-16 11:53:37 +02:00
async fn eventually_open_log(&self, ts: Timestamp) -> File {
loop {
match self.open_log(ts).await {
Ok(log) => break log,
Err(e) => {
error!("open log failed: {e}");
sleep(WRITE_RETRY_DELAY).await;
}
}
}
}
2025-07-20 18:48:47 +02:00
async fn open_log(&self, ts: Timestamp) -> Result<File> {
let log_file = &self.archive_path(ts);
let file = File::options()
.create(true)
.append(true)
.write(true)
.open(log_file)
.await?;
2026-04-16 11:53:37 +02:00
let link_src = &self
.log_path
.join(&self.log_name)
2026-04-14 10:52:36 +02:00
.with_added_extension("log");
let link_tgt = &self.archive_rel_path(ts);
2025-07-20 18:48:47 +02:00
let _ = fs::remove_file(link_src).await;
2026-04-14 10:52:36 +02:00
fs::symlink(link_tgt, link_src).await.map_err(|e| {
format_err!(
"symlink {s} -> {t} failed: {e}",
s = link_src.display(),
t = link_tgt.display()
)
})?;
2025-07-20 18:48:47 +02:00
Ok(file)
}
2026-04-14 10:52:36 +02:00
fn archive_path(&self, ts: Timestamp) -> PathBuf {
2026-04-16 11:53:37 +02:00
self.log_path.join(self.archive_rel_path(ts))
2026-04-14 10:52:36 +02:00
}
fn archive_rel_path(&self, ts: Timestamp) -> PathBuf {
PathBuf::from("archives").join(self.archive_file(ts))
2025-07-20 18:48:47 +02:00
}
2026-04-16 11:53:37 +02:00
fn archive_file(&self, ts: Timestamp) -> PathBuf {
self.log_name
.with_added_extension(ts.format(TS_FORMAT).to_string())
.with_added_extension("log")
2025-07-20 18:48:47 +02:00
}
}
2026-04-13 21:11:07 +02:00
fn forward_signals_to(pid: i32) {
use nix::{
sys::signal::{kill, Signal},
2026-04-13 21:11:07 +02:00
unistd::Pid,
};
use signal_hook::{consts::*, low_level::register};
2026-04-13 21:11:07 +02:00
2026-04-14 10:52:36 +02:00
debug!("forwarding signals to pid {pid}");
2026-04-13 21:11:07 +02:00
let pid = Pid::from_raw(pid);
let signals = [
2026-04-13 21:11:07 +02:00
SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGUSR1, SIGUSR2, SIGPIPE, SIGALRM,
];
2026-04-13 21:11:07 +02:00
for sig in signals {
let Ok(signal) = Signal::try_from(sig) else {
2026-04-13 21:11:07 +02:00
continue;
};
unsafe {
register(sig, move || {
2026-04-14 10:52:36 +02:00
debug!("forwarding {signal} to {pid}");
let _ = kill(pid, signal);
})
.ok();
}
2026-04-13 21:11:07 +02:00
}
}
2025-07-20 18:48:47 +02:00
struct LogItem {
stream_name: &'static str,
ts: chrono::DateTime<chrono::Utc>,
line: Vec<u8>,
}
2026-04-14 10:52:36 +02:00
async fn copy(stream_name: &'static str, out: impl AsyncRead + Unpin, tx: mpsc::Sender<LogItem>) {
2025-07-20 18:48:47 +02:00
let buf_size = page_size::get();
2026-04-14 10:52:36 +02:00
let mut out = BufReader::with_capacity(buf_size, out);
let mut line = Vec::with_capacity(buf_size);
macro_rules! send_line {
() => {
let log = LogItem {
stream_name,
ts: chrono::Utc::now(),
line: line.clone(),
};
if let Err(e) = tx.send(log).await {
warn!("send line failed: {e}");
return;
}
line.clear();
};
}
2025-07-20 18:48:47 +02:00
loop {
2026-04-14 10:52:36 +02:00
let Ok(buf) = (out.fill_buf())
.await
.inspect_err(|e| warn!("read {stream_name} failed: {e}"))
else {
break;
};
if buf.is_empty() {
2025-07-20 18:48:47 +02:00
break;
}
2026-04-14 10:52:36 +02:00
let remaining = buf_size - line.len();
if let Some(pos) = memchr::memchr(b'\n', buf) {
let len = pos + 1;
let mut buf = &buf[..len];
if len > remaining {
line.extend_from_slice(&buf[..remaining]);
send_line!();
buf = &buf[remaining..];
}
line.extend_from_slice(buf);
out.consume(len);
send_line!();
2026-04-16 11:53:37 +02:00
} else if buf.len() >= remaining {
2026-04-14 10:52:36 +02:00
line.extend_from_slice(&buf[..remaining]);
out.consume(remaining);
send_line!();
} else {
line.extend_from_slice(buf);
let len = buf.len();
out.consume(len);
2025-07-20 18:48:47 +02:00
}
}
2026-04-14 10:52:36 +02:00
if !line.is_empty() {
send_line!();
}
2025-07-20 18:48:47 +02:00
}
2025-07-20 22:30:10 +02:00
pub fn trunc_ts(ts: Timestamp) -> Timestamp {
2025-07-20 18:48:47 +02:00
ts.duration_trunc(TRUNC_DELTA)
.expect("duration_trunc failed")
}
2026-04-16 11:53:37 +02:00
async fn compress_archives(
mut read_dir: fs::ReadDir,
log_name: impl AsRef<Path>,
exclude_ts: String,
) {
let log_name = log_name.as_ref();
2025-07-20 18:48:47 +02:00
loop {
let Ok(Some(entry)) =
(read_dir.next_entry().await).inspect_err(|e| error!("archive dir read failed: {e}"))
else {
return;
};
let file_name = entry.file_name();
let Some(file_name) = file_name.to_str() else {
continue;
};
let Some(name) = file_name.strip_suffix(".log") else {
continue;
};
let Some((name, ts)) = name.rsplit_once('.') else {
continue;
};
if name != log_name {
continue;
}
if ts == exclude_ts {
continue;
}
let file_path = entry.path();
let Some(file_path) = file_path.to_str() else {
continue;
};
compress(file_path).await;
}
}
async fn compress(path: impl AsRef<Path>) {
let path = path.as_ref();
let result = async {
let path_str = path.to_string_lossy();
debug!("compressing {path_str}");
let mut input = File::open(path)
.await
.map_err(|e| format_err!("open {path_str} failed: {e}"))?;
let out_path = path.with_extension("zst");
2025-07-20 18:48:47 +02:00
let out = (File::create(&out_path).await) // create output
.map_err(|e| format_err!("create {} failed: {e}", out_path.to_string_lossy()))?;
let mut out = ZstdEncoder::new(out);
async {
tokio::io::copy(&mut input, &mut out).await?;
2026-04-14 10:52:36 +02:00
out.shutdown().await
2025-07-20 18:48:47 +02:00
}
.await
.map_err(|e| format_err!("compression of {path_str} failed: {e}"))?;
fs::remove_file(path)
.await
2026-04-14 10:52:36 +02:00
.map_err(|e| format_err!("remove {path_str} failed: {e}"))
2025-07-20 18:48:47 +02:00
}
.await;
if let Err(e) = result {
warn!("{e}");
}
}
pub fn parse_ts(ts: &str) -> std::result::Result<Timestamp, chrono::ParseError> {
2026-04-14 10:52:36 +02:00
let format = &format!("{TS_FORMAT}%M%S");
let full_ts = &format!("{ts}0000");
let dt = chrono::NaiveDateTime::parse_from_str(full_ts, format)?;
2025-07-20 18:48:47 +02:00
Ok(Timestamp::from_naive_utc_and_offset(dt, Utc))
}
2026-04-13 21:11:07 +02:00
pub async fn log_files(log_path: &str, log_name: &str) -> fs::Result<Vec<LogFile>> {
2025-07-20 18:48:47 +02:00
let mut dir = PathBuf::from(log_path);
dir.push("archives");
let mut entries = Vec::new();
2026-04-13 21:11:07 +02:00
let mut read_dir = fs::read_dir(&dir).await?;
2025-07-20 18:48:47 +02:00
2026-04-13 21:11:07 +02:00
while let Some(entry) =
(read_dir.next_entry().await).map_err(|e| fs::Error::ReadDir(dir.clone(), e))?
{
2025-07-20 18:48:47 +02:00
let file_name = entry.file_name();
let Some(file_name) = file_name.to_str() else {
continue;
};
let Some((name, ext)) = file_name.rsplit_once('.') else {
2025-07-20 18:48:47 +02:00
continue;
};
let compressed = match ext {
"zst" => true,
"log" => false,
_ => continue,
};
2025-07-20 18:48:47 +02:00
let Some((name, timestamp)) = name.rsplit_once('.') else {
continue;
};
if name != log_name {
continue;
};
let Ok(timestamp) =
parse_ts(timestamp).inspect_err(|e| debug!("invalid timestamp: {timestamp}: {e}"))
else {
continue;
};
entries.push(LogFile {
path: entry.path(),
compressed,
timestamp,
});
}
Ok(entries)
}
2026-04-14 10:52:36 +02:00
#[derive(Debug)]
2025-07-20 18:48:47 +02:00
pub struct LogFile {
pub path: PathBuf,
pub compressed: bool,
pub timestamp: Timestamp,
}
2026-04-14 10:52:36 +02:00
impl Eq for LogFile {}
impl PartialEq for LogFile {
fn eq(&self, other: &Self) -> bool {
self.timestamp == other.timestamp
}
}
impl Ord for LogFile {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.timestamp.cmp(&other.timestamp)
}
}
2025-07-20 18:48:47 +02:00
impl PartialOrd for LogFile {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
2026-04-14 10:52:36 +02:00
Some(self.cmp(other))
2025-07-20 18:48:47 +02:00
}
}
impl LogFile {
pub async fn copy_to(&self, out: &mut (impl AsyncWrite + Unpin)) -> io::Result<u64> {
2025-07-20 22:30:10 +02:00
let input = &mut File::open(&self.path).await?;
2025-07-20 18:48:47 +02:00
if self.compressed {
2025-07-20 22:30:10 +02:00
let out = &mut ZstdDecoder::new(out);
tokio::io::copy(input, out).await
2025-07-20 18:48:47 +02:00
} else {
2025-07-20 22:30:10 +02:00
tokio::io::copy(input, out).await
2025-07-20 18:48:47 +02:00
}
}
}