impl apply-config, logger and log
This commit is contained in:
162
src/bin/dkl.rs
Normal file
162
src/bin/dkl.rs
Normal file
@ -0,0 +1,162 @@
|
||||
use clap::{CommandFactory, Parser, Subcommand};
|
||||
use eyre::{format_err, Result};
|
||||
use log::{debug, error};
|
||||
use tokio::fs;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command()]
|
||||
struct Cli {
|
||||
#[command(subcommand)]
|
||||
command: Command,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum Command {
|
||||
ApplyConfig {
|
||||
/// config file to use
|
||||
#[arg(default_value = "config.yaml")]
|
||||
config: String,
|
||||
/// glob filters to select files to apply
|
||||
#[arg(short = 'F', long)]
|
||||
filters: Vec<String>,
|
||||
/// path prefix (aka chroot)
|
||||
#[arg(short = 'P', long, default_value = "/")]
|
||||
prefix: String,
|
||||
},
|
||||
Logger {
|
||||
#[arg(long, short = 'p', default_value = "/var/log")]
|
||||
log_path: String,
|
||||
#[arg(long, short = 'n')]
|
||||
log_name: Option<String>,
|
||||
#[arg(long)]
|
||||
with_prefix: bool,
|
||||
command: String,
|
||||
args: Vec<String>,
|
||||
},
|
||||
Log {
|
||||
#[arg(long, short = 'p', default_value = "/var/log")]
|
||||
log_path: String,
|
||||
log_name: String,
|
||||
since: Option<String>,
|
||||
until: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> Result<()> {
|
||||
clap_complete::CompleteEnv::with_factory(Cli::command).complete();
|
||||
|
||||
let cli = Cli::parse();
|
||||
|
||||
env_logger::builder()
|
||||
.parse_filters("info")
|
||||
.parse_default_env()
|
||||
.init();
|
||||
|
||||
use Command as C;
|
||||
match cli.command {
|
||||
C::ApplyConfig {
|
||||
config,
|
||||
filters,
|
||||
prefix,
|
||||
} => {
|
||||
let filters = parse_globs(&filters)?;
|
||||
apply_config(&config, &filters, &prefix).await
|
||||
}
|
||||
C::Logger {
|
||||
ref log_path,
|
||||
ref log_name,
|
||||
with_prefix,
|
||||
command,
|
||||
args,
|
||||
} => {
|
||||
let command = command.as_str();
|
||||
let log_name = log_name.as_deref().unwrap_or_else(|| basename(command));
|
||||
|
||||
dkl::logger::Logger {
|
||||
log_path,
|
||||
log_name,
|
||||
with_prefix,
|
||||
}
|
||||
.run(command, &args)
|
||||
.await
|
||||
}
|
||||
C::Log {
|
||||
log_path,
|
||||
log_name,
|
||||
since,
|
||||
until,
|
||||
} => {
|
||||
let since = parse_ts_arg(since)?;
|
||||
let until = parse_ts_arg(until)?;
|
||||
|
||||
let mut files = dkl::logger::log_files(&log_path, &log_name).await?;
|
||||
files.sort();
|
||||
|
||||
let mut out = tokio::io::stdout();
|
||||
|
||||
for f in files {
|
||||
if !since.is_none_or(|since| f.timestamp >= since) {
|
||||
continue;
|
||||
}
|
||||
if !until.is_none_or(|until| f.timestamp <= until) {
|
||||
continue;
|
||||
}
|
||||
|
||||
debug!("{f:?}");
|
||||
f.copy_to(&mut out).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_ts_arg(ts: Option<String>) -> Result<Option<dkl::logger::Timestamp>> {
|
||||
match ts {
|
||||
None => Ok(None),
|
||||
Some(ts) => {
|
||||
let ts = dkl::logger::parse_ts(&ts)
|
||||
.map_err(|e| format_err!("invalid timestamp: {ts}: {e}"))?;
|
||||
Ok(Some(ts))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn basename(path: &str) -> &str {
|
||||
path.rsplit_once('/').map_or(path, |split| split.1)
|
||||
}
|
||||
|
||||
async fn apply_config(config_file: &str, filters: &[glob::Pattern], chroot: &str) -> Result<()> {
|
||||
let config = fs::read_to_string(config_file).await?;
|
||||
let config: dkl::Config = serde_yaml::from_str(&config)?;
|
||||
|
||||
let files = if filters.is_empty() {
|
||||
config.files
|
||||
} else {
|
||||
(config.files.into_iter())
|
||||
.filter(|f| filters.iter().any(|filter| filter.matches(&f.path)))
|
||||
.collect()
|
||||
};
|
||||
|
||||
dkl::apply::files(&files, chroot).await
|
||||
}
|
||||
|
||||
fn parse_globs(filters: &[String]) -> Result<Vec<glob::Pattern>> {
|
||||
let mut errors = false;
|
||||
let filters = (filters.iter())
|
||||
.filter_map(|s| {
|
||||
glob::Pattern::new(s)
|
||||
.inspect_err(|e| {
|
||||
error!("invalid filter: {s:?}: {e}");
|
||||
errors = true;
|
||||
})
|
||||
.ok()
|
||||
})
|
||||
.collect();
|
||||
|
||||
if errors {
|
||||
return Err(format_err!("invalid filters"));
|
||||
}
|
||||
|
||||
Ok(filters)
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
pub mod apply;
|
||||
pub mod bootstrap;
|
||||
pub mod dls;
|
||||
pub mod logger;
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct Config {
|
||||
|
342
src/logger.rs
Normal file
342
src/logger.rs
Normal file
@ -0,0 +1,342 @@
|
||||
use async_compression::tokio::write::{ZstdDecoder, ZstdEncoder};
|
||||
use chrono::{DurationRound, TimeDelta, Utc};
|
||||
use eyre::{format_err, Result};
|
||||
use log::{debug, error, warn};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::Stdio;
|
||||
use tokio::{
|
||||
fs::{self, File},
|
||||
io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter},
|
||||
process,
|
||||
sync::mpsc,
|
||||
time::{sleep, Duration},
|
||||
};
|
||||
|
||||
pub type Timestamp = chrono::DateTime<Utc>;
|
||||
|
||||
const TS_FORMAT: &str = "%Y%m%d_%H";
|
||||
const PREFIX_TS_FORMAT: &str = "%H:%M:%S";
|
||||
const TRUNC_DELTA: TimeDelta = TimeDelta::hours(1);
|
||||
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
|
||||
const WRITE_RETRY_DELAY: Duration = Duration::from_secs(1);
|
||||
|
||||
pub struct Logger<'t> {
|
||||
pub log_path: &'t str,
|
||||
pub log_name: &'t str,
|
||||
pub with_prefix: bool,
|
||||
}
|
||||
|
||||
impl<'t> Logger<'t> {
|
||||
pub async fn run(&self, command: &str, args: &[String]) -> Result<()> {
|
||||
// make sure we can at least open the log before starting the command
|
||||
let archives_path = &format!("{path}/archives", path = self.log_path);
|
||||
(fs::create_dir_all(archives_path).await)
|
||||
.map_err(|e| format_err!("failed to create archives dir: {e}"))?;
|
||||
let archives_read_dir = (fs::read_dir(archives_path).await)
|
||||
.map_err(|e| format_err!("failed to list archives: {e}"))?;
|
||||
|
||||
let mut prev_stamp = ts_trunc(Utc::now());
|
||||
let mut current_log = BufWriter::new(self.open_log(prev_stamp).await?);
|
||||
|
||||
tokio::spawn(compress_archives(
|
||||
archives_read_dir,
|
||||
self.log_name.to_string(),
|
||||
prev_stamp.format(TS_FORMAT).to_string(),
|
||||
));
|
||||
|
||||
// start the command
|
||||
let mut child = process::Command::new(command)
|
||||
.args(args)
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()?;
|
||||
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
|
||||
tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone()));
|
||||
tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx));
|
||||
|
||||
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
|
||||
|
||||
// handle output
|
||||
loop {
|
||||
tokio::select!(
|
||||
r = rx.recv() => {
|
||||
let Some(log) = r else { break; };
|
||||
|
||||
while let Err(e) = self.log_item(&log, &mut prev_stamp, &mut current_log).await
|
||||
{
|
||||
error!("log failed: {e}");
|
||||
sleep(WRITE_RETRY_DELAY).await;
|
||||
}
|
||||
}
|
||||
_ = flush_ticker.tick() => {
|
||||
if let Err(e) = current_log.flush().await {
|
||||
warn!("log flush failed: {e}");
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// finalize
|
||||
while let Err(e) = current_log.flush().await {
|
||||
error!("final log flush failed: {e}");
|
||||
sleep(WRITE_RETRY_DELAY).await;
|
||||
}
|
||||
|
||||
let status = child.wait().await?;
|
||||
std::process::exit(status.code().unwrap_or(-1));
|
||||
}
|
||||
|
||||
async fn log_item(
|
||||
&self,
|
||||
log: &LogItem,
|
||||
prev_stamp: &mut Timestamp,
|
||||
out: &mut BufWriter<File>,
|
||||
) -> Result<()> {
|
||||
let trunc_ts = ts_trunc(log.ts);
|
||||
if *prev_stamp < trunc_ts {
|
||||
// switch log
|
||||
out.flush().await?;
|
||||
*out = BufWriter::new(self.open_log(trunc_ts).await?);
|
||||
|
||||
let prev_path = self.archive_path(*prev_stamp);
|
||||
tokio::spawn(async move { compress(&prev_path).await });
|
||||
|
||||
*prev_stamp = trunc_ts;
|
||||
};
|
||||
|
||||
if self.with_prefix {
|
||||
let prefix = format!("{} {}| ", log.ts.format(PREFIX_TS_FORMAT), log.stream_name);
|
||||
out.write_all(prefix.as_bytes()).await?;
|
||||
}
|
||||
|
||||
out.write_all(&log.line).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn open_log(&self, ts: Timestamp) -> Result<File> {
|
||||
let log_file = &self.archive_path(ts);
|
||||
|
||||
let file = File::options()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.write(true)
|
||||
.open(log_file)
|
||||
.await?;
|
||||
|
||||
let link_src = &format!(
|
||||
"{path}/{name}.log",
|
||||
path = self.log_path,
|
||||
name = self.log_name
|
||||
);
|
||||
let link_tgt = log_file
|
||||
.strip_prefix(&format!("{}/", self.log_path))
|
||||
.unwrap_or(log_file);
|
||||
|
||||
let _ = fs::remove_file(link_src).await;
|
||||
fs::symlink(link_tgt, link_src)
|
||||
.await
|
||||
.map_err(|e| format_err!("symlink {link_src} -> {link_tgt} failed: {e}",))?;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
fn archive_path(&self, ts: Timestamp) -> String {
|
||||
format!(
|
||||
"{path}/archives/{file}",
|
||||
path = self.log_path,
|
||||
file = self.archive_file(ts)
|
||||
)
|
||||
}
|
||||
fn archive_file(&self, ts: Timestamp) -> String {
|
||||
format!(
|
||||
"{name}.{ts}.log",
|
||||
name = self.log_name,
|
||||
ts = ts.format(TS_FORMAT),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
struct LogItem {
|
||||
stream_name: &'static str,
|
||||
ts: chrono::DateTime<chrono::Utc>,
|
||||
line: Vec<u8>,
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
stream_name: &'static str,
|
||||
out: impl AsyncRead + Unpin,
|
||||
tx: mpsc::UnboundedSender<LogItem>,
|
||||
) {
|
||||
let mut out = BufReader::new(out);
|
||||
let buf_size = page_size::get();
|
||||
loop {
|
||||
let mut line = Vec::with_capacity(buf_size);
|
||||
if let Err(e) = out.read_until(b'\n', &mut line).await {
|
||||
warn!("read {stream_name} failed: {e}");
|
||||
return;
|
||||
}
|
||||
if line.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
let log = LogItem {
|
||||
stream_name,
|
||||
ts: chrono::Utc::now(),
|
||||
line,
|
||||
};
|
||||
if let Err(e) = tx.send(log) {
|
||||
warn!("send line failed: {e}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn ts_trunc(ts: Timestamp) -> Timestamp {
|
||||
ts.duration_trunc(TRUNC_DELTA)
|
||||
.expect("duration_trunc failed")
|
||||
}
|
||||
|
||||
async fn compress_archives(mut read_dir: fs::ReadDir, log_name: String, exclude_ts: String) {
|
||||
loop {
|
||||
let Ok(Some(entry)) =
|
||||
(read_dir.next_entry().await).inspect_err(|e| error!("archive dir read failed: {e}"))
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
let file_name = entry.file_name();
|
||||
let Some(file_name) = file_name.to_str() else {
|
||||
continue;
|
||||
};
|
||||
let Some(name) = file_name.strip_suffix(".log") else {
|
||||
continue;
|
||||
};
|
||||
let Some((name, ts)) = name.rsplit_once('.') else {
|
||||
continue;
|
||||
};
|
||||
if name != log_name {
|
||||
continue;
|
||||
}
|
||||
if ts == exclude_ts {
|
||||
continue;
|
||||
}
|
||||
|
||||
let file_path = entry.path();
|
||||
let Some(file_path) = file_path.to_str() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
compress(file_path).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn compress(path: impl AsRef<Path>) {
|
||||
let path = path.as_ref();
|
||||
|
||||
let result = async {
|
||||
let path_str = path.to_string_lossy();
|
||||
debug!("compressing {path_str}");
|
||||
|
||||
let mut input = File::open(path)
|
||||
.await
|
||||
.map_err(|e| format_err!("open {path_str} failed: {e}"))?;
|
||||
|
||||
let out_path = path.with_extension("zstd");
|
||||
let out = (File::create(&out_path).await) // create output
|
||||
.map_err(|e| format_err!("create {} failed: {e}", out_path.to_string_lossy()))?;
|
||||
|
||||
let mut out = ZstdEncoder::new(out);
|
||||
async {
|
||||
tokio::io::copy(&mut input, &mut out).await?;
|
||||
out.flush().await
|
||||
}
|
||||
.await
|
||||
.map_err(|e| format_err!("compression of {path_str} failed: {e}"))?;
|
||||
|
||||
fs::remove_file(path)
|
||||
.await
|
||||
.map_err(|e| format_err!("remove {path_str} failed: {e}"))
|
||||
}
|
||||
.await;
|
||||
|
||||
if let Err(e) = result {
|
||||
warn!("{e}");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_ts(ts: &str) -> std::result::Result<Timestamp, chrono::ParseError> {
|
||||
let dt =
|
||||
chrono::NaiveDateTime::parse_from_str(&format!("{ts}0000"), &format!("{TS_FORMAT}%M%S"))?;
|
||||
Ok(Timestamp::from_naive_utc_and_offset(dt, Utc))
|
||||
}
|
||||
|
||||
pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result<Vec<LogFile>> {
|
||||
let mut dir = PathBuf::from(log_path);
|
||||
dir.push("archives");
|
||||
|
||||
let mut entries = Vec::new();
|
||||
let mut read_dir = fs::read_dir(dir).await?;
|
||||
|
||||
while let Some(entry) = read_dir.next_entry().await? {
|
||||
let file_name = entry.file_name();
|
||||
let Some(file_name) = file_name.to_str() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let (name, compressed) = file_name
|
||||
.strip_suffix(".zstd")
|
||||
.map_or((file_name, false), |s| (s, true));
|
||||
|
||||
let Some(name) = name.strip_suffix(".log") else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some((name, timestamp)) = name.rsplit_once('.') else {
|
||||
continue;
|
||||
};
|
||||
if name != log_name {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Ok(timestamp) =
|
||||
parse_ts(timestamp).inspect_err(|e| debug!("invalid timestamp: {timestamp}: {e}"))
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
entries.push(LogFile {
|
||||
path: entry.path(),
|
||||
compressed,
|
||||
timestamp,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Ord)]
|
||||
pub struct LogFile {
|
||||
pub path: PathBuf,
|
||||
pub compressed: bool,
|
||||
pub timestamp: Timestamp,
|
||||
}
|
||||
|
||||
impl PartialOrd for LogFile {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
self.timestamp.partial_cmp(&other.timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
impl LogFile {
|
||||
pub async fn copy_to(&self, out: &mut (impl AsyncWrite + Unpin)) -> io::Result<u64> {
|
||||
let mut input = File::open(&self.path).await?;
|
||||
if self.compressed {
|
||||
let mut out = ZstdDecoder::new(out);
|
||||
tokio::io::copy(&mut input, &mut out).await
|
||||
} else {
|
||||
tokio::io::copy(&mut input, out).await
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user