feat(dkl rc)
This commit is contained in:
+88
-50
@@ -2,12 +2,13 @@ use async_compression::tokio::write::{ZstdDecoder, ZstdEncoder};
|
||||
use chrono::{DurationRound, TimeDelta, Utc};
|
||||
use eyre::{format_err, Result};
|
||||
use log::{debug, error, warn};
|
||||
use std::ffi::OsStr;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::Stdio;
|
||||
use tokio::{
|
||||
fs::File,
|
||||
io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter},
|
||||
process,
|
||||
process::{Child, Command},
|
||||
sync::mpsc,
|
||||
time::{sleep, Duration},
|
||||
};
|
||||
@@ -22,67 +23,92 @@ const TRUNC_DELTA: TimeDelta = TimeDelta::hours(1);
|
||||
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
|
||||
const WRITE_RETRY_DELAY: Duration = Duration::from_secs(1);
|
||||
|
||||
pub struct Logger<'t> {
|
||||
pub log_path: &'t str,
|
||||
pub log_name: &'t str,
|
||||
pub struct Logger {
|
||||
pub log_path: PathBuf,
|
||||
pub log_name: PathBuf,
|
||||
pub with_prefix: bool,
|
||||
pub cgroup: Option<String>,
|
||||
}
|
||||
|
||||
impl<'t> Logger<'t> {
|
||||
pub async fn run(&self, cgroup: Option<String>, command: &str, args: &[String]) -> Result<()> {
|
||||
impl Logger {
|
||||
pub async fn setup<I, S>(&self, command: impl AsRef<OsStr>, args: I) -> fs::Result<Command>
|
||||
where
|
||||
I: IntoIterator<Item = S>,
|
||||
S: AsRef<OsStr>,
|
||||
{
|
||||
// make sure we can at least open the log before starting the command
|
||||
let archives_path = &format!("{path}/archives", path = self.log_path);
|
||||
(fs::create_dir_all(archives_path).await)
|
||||
.map_err(|e| format_err!("failed to create archives dir: {e}"))?;
|
||||
let archives_read_dir = (fs::read_dir(archives_path).await)
|
||||
.map_err(|e| format_err!("failed to list archives: {e}"))?;
|
||||
let archives_path = &self.log_path.join("archives");
|
||||
fs::create_dir_all(archives_path).await?;
|
||||
|
||||
let mut prev_stamp = trunc_ts(Utc::now());
|
||||
let mut current_log = BufWriter::new(self.open_log(prev_stamp).await?);
|
||||
let archives_read_dir = fs::read_dir(archives_path).await?;
|
||||
|
||||
let prev_stamp = trunc_ts(Utc::now());
|
||||
|
||||
tokio::spawn(compress_archives(
|
||||
archives_read_dir,
|
||||
self.log_name.to_string(),
|
||||
self.log_name.clone(),
|
||||
prev_stamp.format(TS_FORMAT).to_string(),
|
||||
));
|
||||
|
||||
// start the command
|
||||
let mut cmd = process::Command::new(command);
|
||||
cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped());
|
||||
if let Some(cgroup) = cgroup.as_deref() {
|
||||
let mut cg_path = PathBuf::from(cgroup::ROOT);
|
||||
cg_path.push(cgroup);
|
||||
cg_path.push(self.log_name);
|
||||
// create the command
|
||||
let mut cmd = Command::new(command);
|
||||
|
||||
use std::io::ErrorKind as K;
|
||||
match tokio::fs::create_dir(&cg_path).await {
|
||||
Ok(_) => debug!("created dir {}", cg_path.display()),
|
||||
Err(e) if e.kind() == K::AlreadyExists => {
|
||||
debug!("existing dir {}", cg_path.display())
|
||||
}
|
||||
Err(e) => return Err(fs::Error::CreateDir(cg_path, e).into()),
|
||||
}
|
||||
cmd.args(args);
|
||||
|
||||
if let Some(cgroup) = self.cgroup.as_deref() {
|
||||
let cg_path = PathBuf::from(cgroup::ROOT)
|
||||
.join(cgroup)
|
||||
.join(&self.log_name);
|
||||
|
||||
fs::create_dir_all(&cg_path).await?;
|
||||
|
||||
let procs_file = cg_path.join("cgroup.procs");
|
||||
debug!("procs file {}", procs_file.display());
|
||||
fs::write(&procs_file, b"0").await?;
|
||||
|
||||
unsafe { cmd.pre_exec(move || std::fs::write(&procs_file, b"0")) };
|
||||
}
|
||||
|
||||
let mut child = cmd.spawn().map_err(|e| format_err!("exec failed: {e}"))?;
|
||||
Ok(cmd)
|
||||
}
|
||||
|
||||
let (tx, mut rx) = mpsc::channel(8);
|
||||
pub fn spawn(self, mut cmd: Command) -> std::io::Result<Child> {
|
||||
// setup outputs for capture
|
||||
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
|
||||
|
||||
// spawn
|
||||
let mut child = cmd.spawn()?;
|
||||
|
||||
// capture outputs
|
||||
let (tx, rx) = mpsc::channel(8);
|
||||
|
||||
tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone()));
|
||||
tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx));
|
||||
|
||||
// log outputs
|
||||
tokio::spawn(self.log_stream(rx));
|
||||
|
||||
Ok(child)
|
||||
}
|
||||
|
||||
// TODO: Result<!> when stable
|
||||
pub async fn exec(self, cmd: Command) -> Result<()> {
|
||||
let mut child = self.spawn(cmd)?;
|
||||
|
||||
// forward signals
|
||||
if let Some(child_pid) = child.id() {
|
||||
forward_signals_to(child_pid as i32);
|
||||
}
|
||||
|
||||
// handle output
|
||||
let status = child.wait().await?;
|
||||
std::process::exit(status.code().unwrap_or(-1));
|
||||
}
|
||||
|
||||
async fn log_stream(self, mut rx: mpsc::Receiver<LogItem>) {
|
||||
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
|
||||
|
||||
let mut prev_stamp = trunc_ts(Utc::now());
|
||||
let mut current_log = BufWriter::new(self.eventually_open_log(prev_stamp).await);
|
||||
|
||||
loop {
|
||||
tokio::select!(
|
||||
r = rx.recv() => {
|
||||
@@ -102,15 +128,11 @@ impl<'t> Logger<'t> {
|
||||
);
|
||||
}
|
||||
|
||||
let status = child.wait().await?;
|
||||
|
||||
// finalize
|
||||
while let Err(e) = current_log.flush().await {
|
||||
error!("final log flush failed: {e}");
|
||||
while let Err(e) = current_log.shutdown().await {
|
||||
error!("final log shutdown failed: {e}");
|
||||
sleep(WRITE_RETRY_DELAY).await;
|
||||
}
|
||||
|
||||
std::process::exit(status.code().unwrap_or(-1));
|
||||
}
|
||||
|
||||
async fn log_item(
|
||||
@@ -143,6 +165,18 @@ impl<'t> Logger<'t> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn eventually_open_log(&self, ts: Timestamp) -> File {
|
||||
loop {
|
||||
match self.open_log(ts).await {
|
||||
Ok(log) => break log,
|
||||
Err(e) => {
|
||||
error!("open log failed: {e}");
|
||||
sleep(WRITE_RETRY_DELAY).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn open_log(&self, ts: Timestamp) -> Result<File> {
|
||||
let log_file = &self.archive_path(ts);
|
||||
|
||||
@@ -153,8 +187,9 @@ impl<'t> Logger<'t> {
|
||||
.open(log_file)
|
||||
.await?;
|
||||
|
||||
let link_src = &PathBuf::from(self.log_path)
|
||||
.join(self.log_name)
|
||||
let link_src = &self
|
||||
.log_path
|
||||
.join(&self.log_name)
|
||||
.with_added_extension("log");
|
||||
let link_tgt = &self.archive_rel_path(ts);
|
||||
|
||||
@@ -171,17 +206,15 @@ impl<'t> Logger<'t> {
|
||||
}
|
||||
|
||||
fn archive_path(&self, ts: Timestamp) -> PathBuf {
|
||||
PathBuf::from(self.log_path).join(self.archive_rel_path(ts))
|
||||
self.log_path.join(self.archive_rel_path(ts))
|
||||
}
|
||||
fn archive_rel_path(&self, ts: Timestamp) -> PathBuf {
|
||||
PathBuf::from("archives").join(self.archive_file(ts))
|
||||
}
|
||||
fn archive_file(&self, ts: Timestamp) -> String {
|
||||
format!(
|
||||
"{name}.{ts}.log",
|
||||
name = self.log_name,
|
||||
ts = ts.format(TS_FORMAT),
|
||||
)
|
||||
fn archive_file(&self, ts: Timestamp) -> PathBuf {
|
||||
self.log_name
|
||||
.with_added_extension(ts.format(TS_FORMAT).to_string())
|
||||
.with_added_extension("log")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -266,7 +299,7 @@ async fn copy(stream_name: &'static str, out: impl AsyncRead + Unpin, tx: mpsc::
|
||||
line.extend_from_slice(buf);
|
||||
out.consume(len);
|
||||
send_line!();
|
||||
} else if buf.len() > remaining {
|
||||
} else if buf.len() >= remaining {
|
||||
line.extend_from_slice(&buf[..remaining]);
|
||||
out.consume(remaining);
|
||||
send_line!();
|
||||
@@ -287,7 +320,12 @@ pub fn trunc_ts(ts: Timestamp) -> Timestamp {
|
||||
.expect("duration_trunc failed")
|
||||
}
|
||||
|
||||
async fn compress_archives(mut read_dir: fs::ReadDir, log_name: String, exclude_ts: String) {
|
||||
async fn compress_archives(
|
||||
mut read_dir: fs::ReadDir,
|
||||
log_name: impl AsRef<Path>,
|
||||
exclude_ts: String,
|
||||
) {
|
||||
let log_name = log_name.as_ref();
|
||||
loop {
|
||||
let Ok(Some(entry)) =
|
||||
(read_dir.next_entry().await).inspect_err(|e| error!("archive dir read failed: {e}"))
|
||||
|
||||
Reference in New Issue
Block a user