diff --git a/.gitignore b/.gitignore index d29d4a3..d057c42 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,7 @@ /dls /modd.conf /m1_bootstrap-config +/config.yaml +/dist +/dkl +/tmp diff --git a/Cargo.lock b/Cargo.lock index 43ac1e9..19f2072 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,12 +76,32 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "async-compression" +version = "0.4.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddb939d66e4ae03cee6091612804ba446b12878410cfa17f785f4dd67d4014e8" +dependencies = [ + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "zstd", + "zstd-safe", +] + [[package]] name = "atomic-waker" version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + [[package]] name = "backtrace" version = "0.3.75" @@ -123,10 +143,12 @@ checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" [[package]] name = "cc" -version = "1.2.27" +version = "1.2.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d487aa071b5f64da6f19a3e848e3578944b726ee5a4854b82172f02aa876bfdc" +checksum = "5c1599538de2394445747c8cf7935946e3cc27e9625f889d979bfb2aaf569362" dependencies = [ + "jobserver", + "libc", "shlex", ] @@ -137,10 +159,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" [[package]] -name = "clap" -version = "4.5.40" +name = "chrono" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40b6887a1d8685cebccf115538db5c0efe625ccac9696ad45c409d96566e910f" +checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +dependencies = [ + "num-traits", +] + +[[package]] +name = "clap" +version = "4.5.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be92d32e80243a54711e5d7ce823c35c41c9d929dc4ab58e1276f625841aadf9" dependencies = [ "clap_builder", "clap_derive", @@ -148,9 +179,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.40" +version = "4.5.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0c66c08ce9f0c698cbce5c0279d0bb6ac936d8674174fe48f736533b964f59e" +checksum = "707eab41e9622f9139419d573eca0900137718000c517d47da73045f54331c3d" dependencies = [ "anstream", "anstyle", @@ -160,9 +191,9 @@ dependencies = [ [[package]] name = "clap_complete" -version = "4.5.54" +version = "4.5.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aad5b1b4de04fead402672b48897030eec1f3bfe1550776322f59f6d6e6a5677" +checksum = "a5abde44486daf70c5be8b8f8f1b66c49f86236edf6fa2abadb4d961c4c6229a" dependencies = [ "clap", "clap_lex", @@ -172,9 +203,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.40" +version = "4.5.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2c7947ae4cc3d851207c1adb5b5e260ff0cca11446b1d6d1423788e442257ce" +checksum = "ef4f52386a59ca4c860f7393bcf8abd8dfd91ecccc0f774635ff68e92eeef491" dependencies = [ "heck", "proc-macro2", @@ -225,13 +256,17 @@ dependencies = [ name = "dkl" version = "1.0.0" dependencies = [ + "async-compression", "bytes", + "chrono", "clap", "clap_complete", "env_logger", "eyre", "futures-util", + "glob", "log", + "page_size", "reqwest", "serde", "serde_json", @@ -425,10 +460,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] -name = "h2" -version = "0.4.10" +name = "glob" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9421a676d1b147b16b82c9225157dc629087ef8ec4d5e2960f9437a90dac0a5" +checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" + +[[package]] +name = "h2" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17da50a276f1e01e0ba6c029e47b7100754904ee8a278f886546e98575380785" dependencies = [ "atomic-waker", "bytes", @@ -549,9 +590,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc2fdfdbff08affe55bb779f33b053aa1fe5dd5b54c257343c17edfa55711bdb" +checksum = "7f66d5bd4c6f02bf0542fad85d626775bab9258cf795a4256dcaf3161114d1df" dependencies = [ "base64", "bytes", @@ -696,6 +737,17 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "io-uring" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013" +dependencies = [ + "bitflags", + "cfg-if", + "libc", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -757,6 +809,16 @@ dependencies = [ "syn", ] +[[package]] +name = "jobserver" +version = "0.1.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a" +dependencies = [ + "getrandom 0.3.3", + "libc", +] + [[package]] name = "js-sys" version = "0.3.77" @@ -840,6 +902,15 @@ dependencies = [ "tempfile", ] +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "object" version = "0.36.7" @@ -905,6 +976,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "page_size" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30d5b2194ed13191c1999ae0704b7839fb18384fa22e49b57eeaa97d79ce40da" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -1008,9 +1089,9 @@ checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "reqwest" -version = "0.12.20" +version = "0.12.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eabf4c97d9130e2bf606614eb937e86edac8292eaa6f422f995d7e8de1eb1813" +checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531" dependencies = [ "base64", "bytes", @@ -1071,22 +1152,22 @@ checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f" [[package]] name = "rustix" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" +checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" dependencies = [ "bitflags", "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] name = "rustls" -version = "0.23.28" +version = "0.23.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7160e3e10bf4535308537f3c4e1641468cd0e485175d6163087c0393c7d46643" +checksum = "2491382039b29b9b11ff08b76ff6c97cf287671dbb74f0be44bda389fffe9bd1" dependencies = [ "once_cell", "rustls-pki-types", @@ -1106,9 +1187,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.3" +version = "0.103.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435" +checksum = "0a17884ae0c1b773f1ccd2bd4a8c72f16da897310a98b0e84bf349ad5ead92fc" dependencies = [ "ring", "rustls-pki-types", @@ -1222,6 +1303,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.10" @@ -1359,15 +1449,18 @@ dependencies = [ [[package]] name = "tokio" -version = "1.45.1" +version = "1.46.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" +checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio", "pin-project-lite", + "signal-hook-registry", + "slab", "socket2", "tokio-macros", "windows-sys 0.52.0", @@ -1962,3 +2055,31 @@ dependencies = [ "quote", "syn", ] + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.15+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index f5479c7..4e1693a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,18 +3,29 @@ name = "dkl" version = "1.0.0" edition = "2024" +[profile.release] +strip = true +panic = "abort" +opt-level = "z" +lto = true +codegen-units = 1 + [dependencies] +async-compression = { version = "0.4.27", features = ["tokio", "zstd"] } bytes = "1.10.1" +chrono = { version = "0.4.41", default-features = false, features = ["now"] } clap = { version = "4.5.40", features = ["derive", "env"] } clap_complete = { version = "4.5.54", features = ["unstable-dynamic"] } env_logger = "0.11.8" eyre = "0.6.12" futures-util = "0.3.31" +glob = "0.3.2" log = "0.4.27" +page_size = "0.6.0" reqwest = { version = "0.12.20", features = ["json", "stream"] } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" serde_yaml = "0.9.34" thiserror = "2.0.12" -tokio = { version = "1.45.1", features = ["fs", "macros", "rt"] } +tokio = { version = "1.45.1", features = ["fs", "io-std", "macros", "process", "rt"] } diff --git a/build-dist b/build-dist new file mode 100755 index 0000000..8240a63 --- /dev/null +++ b/build-dist @@ -0,0 +1 @@ +cargo install --path . --root dist diff --git a/src/bin/dkl.rs b/src/bin/dkl.rs new file mode 100644 index 0000000..2a099ae --- /dev/null +++ b/src/bin/dkl.rs @@ -0,0 +1,162 @@ +use clap::{CommandFactory, Parser, Subcommand}; +use eyre::{format_err, Result}; +use log::{debug, error}; +use tokio::fs; + +#[derive(Parser)] +#[command()] +struct Cli { + #[command(subcommand)] + command: Command, +} + +#[derive(Subcommand)] +enum Command { + ApplyConfig { + /// config file to use + #[arg(default_value = "config.yaml")] + config: String, + /// glob filters to select files to apply + #[arg(short = 'F', long)] + filters: Vec, + /// path prefix (aka chroot) + #[arg(short = 'P', long, default_value = "/")] + prefix: String, + }, + Logger { + #[arg(long, short = 'p', default_value = "/var/log")] + log_path: String, + #[arg(long, short = 'n')] + log_name: Option, + #[arg(long)] + with_prefix: bool, + command: String, + args: Vec, + }, + Log { + #[arg(long, short = 'p', default_value = "/var/log")] + log_path: String, + log_name: String, + since: Option, + until: Option, + }, +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<()> { + clap_complete::CompleteEnv::with_factory(Cli::command).complete(); + + let cli = Cli::parse(); + + env_logger::builder() + .parse_filters("info") + .parse_default_env() + .init(); + + use Command as C; + match cli.command { + C::ApplyConfig { + config, + filters, + prefix, + } => { + let filters = parse_globs(&filters)?; + apply_config(&config, &filters, &prefix).await + } + C::Logger { + ref log_path, + ref log_name, + with_prefix, + command, + args, + } => { + let command = command.as_str(); + let log_name = log_name.as_deref().unwrap_or_else(|| basename(command)); + + dkl::logger::Logger { + log_path, + log_name, + with_prefix, + } + .run(command, &args) + .await + } + C::Log { + log_path, + log_name, + since, + until, + } => { + let since = parse_ts_arg(since)?; + let until = parse_ts_arg(until)?; + + let mut files = dkl::logger::log_files(&log_path, &log_name).await?; + files.sort(); + + let mut out = tokio::io::stdout(); + + for f in files { + if !since.is_none_or(|since| f.timestamp >= since) { + continue; + } + if !until.is_none_or(|until| f.timestamp <= until) { + continue; + } + + debug!("{f:?}"); + f.copy_to(&mut out).await?; + } + Ok(()) + } + } +} + +fn parse_ts_arg(ts: Option) -> Result> { + match ts { + None => Ok(None), + Some(ts) => { + let ts = dkl::logger::parse_ts(&ts) + .map_err(|e| format_err!("invalid timestamp: {ts}: {e}"))?; + Ok(Some(ts)) + } + } +} + +fn basename(path: &str) -> &str { + path.rsplit_once('/').map_or(path, |split| split.1) +} + +async fn apply_config(config_file: &str, filters: &[glob::Pattern], chroot: &str) -> Result<()> { + let config = fs::read_to_string(config_file).await?; + let config: dkl::Config = serde_yaml::from_str(&config)?; + + let files = if filters.is_empty() { + config.files + } else { + (config.files.into_iter()) + .filter(|f| filters.iter().any(|filter| filter.matches(&f.path))) + .collect() + }; + + dkl::apply::files(&files, chroot).await +} + +fn parse_globs(filters: &[String]) -> Result> { + let mut errors = false; + let filters = (filters.iter()) + .filter_map(|s| { + glob::Pattern::new(s) + .inspect_err(|e| { + error!("invalid filter: {s:?}: {e}"); + errors = true; + }) + .ok() + }) + .collect(); + + if errors { + return Err(format_err!("invalid filters")); + } + + Ok(filters) +} diff --git a/src/lib.rs b/src/lib.rs index 309b880..3e49221 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ pub mod apply; pub mod bootstrap; pub mod dls; +pub mod logger; #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct Config { diff --git a/src/logger.rs b/src/logger.rs new file mode 100644 index 0000000..9378791 --- /dev/null +++ b/src/logger.rs @@ -0,0 +1,342 @@ +use async_compression::tokio::write::{ZstdDecoder, ZstdEncoder}; +use chrono::{DurationRound, TimeDelta, Utc}; +use eyre::{format_err, Result}; +use log::{debug, error, warn}; +use std::path::{Path, PathBuf}; +use std::process::Stdio; +use tokio::{ + fs::{self, File}, + io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}, + process, + sync::mpsc, + time::{sleep, Duration}, +}; + +pub type Timestamp = chrono::DateTime; + +const TS_FORMAT: &str = "%Y%m%d_%H"; +const PREFIX_TS_FORMAT: &str = "%H:%M:%S"; +const TRUNC_DELTA: TimeDelta = TimeDelta::hours(1); +const FLUSH_INTERVAL: Duration = Duration::from_secs(1); +const WRITE_RETRY_DELAY: Duration = Duration::from_secs(1); + +pub struct Logger<'t> { + pub log_path: &'t str, + pub log_name: &'t str, + pub with_prefix: bool, +} + +impl<'t> Logger<'t> { + pub async fn run(&self, command: &str, args: &[String]) -> Result<()> { + // make sure we can at least open the log before starting the command + let archives_path = &format!("{path}/archives", path = self.log_path); + (fs::create_dir_all(archives_path).await) + .map_err(|e| format_err!("failed to create archives dir: {e}"))?; + let archives_read_dir = (fs::read_dir(archives_path).await) + .map_err(|e| format_err!("failed to list archives: {e}"))?; + + let mut prev_stamp = ts_trunc(Utc::now()); + let mut current_log = BufWriter::new(self.open_log(prev_stamp).await?); + + tokio::spawn(compress_archives( + archives_read_dir, + self.log_name.to_string(), + prev_stamp.format(TS_FORMAT).to_string(), + )); + + // start the command + let mut child = process::Command::new(command) + .args(args) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + let (tx, mut rx) = mpsc::unbounded_channel(); + + tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone())); + tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx)); + + let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL); + + // handle output + loop { + tokio::select!( + r = rx.recv() => { + let Some(log) = r else { break; }; + + while let Err(e) = self.log_item(&log, &mut prev_stamp, &mut current_log).await + { + error!("log failed: {e}"); + sleep(WRITE_RETRY_DELAY).await; + } + } + _ = flush_ticker.tick() => { + if let Err(e) = current_log.flush().await { + warn!("log flush failed: {e}"); + } + } + ); + } + + // finalize + while let Err(e) = current_log.flush().await { + error!("final log flush failed: {e}"); + sleep(WRITE_RETRY_DELAY).await; + } + + let status = child.wait().await?; + std::process::exit(status.code().unwrap_or(-1)); + } + + async fn log_item( + &self, + log: &LogItem, + prev_stamp: &mut Timestamp, + out: &mut BufWriter, + ) -> Result<()> { + let trunc_ts = ts_trunc(log.ts); + if *prev_stamp < trunc_ts { + // switch log + out.flush().await?; + *out = BufWriter::new(self.open_log(trunc_ts).await?); + + let prev_path = self.archive_path(*prev_stamp); + tokio::spawn(async move { compress(&prev_path).await }); + + *prev_stamp = trunc_ts; + }; + + if self.with_prefix { + let prefix = format!("{} {}| ", log.ts.format(PREFIX_TS_FORMAT), log.stream_name); + out.write_all(prefix.as_bytes()).await?; + } + + out.write_all(&log.line).await?; + Ok(()) + } + + async fn open_log(&self, ts: Timestamp) -> Result { + let log_file = &self.archive_path(ts); + + let file = File::options() + .create(true) + .append(true) + .write(true) + .open(log_file) + .await?; + + let link_src = &format!( + "{path}/{name}.log", + path = self.log_path, + name = self.log_name + ); + let link_tgt = log_file + .strip_prefix(&format!("{}/", self.log_path)) + .unwrap_or(log_file); + + let _ = fs::remove_file(link_src).await; + fs::symlink(link_tgt, link_src) + .await + .map_err(|e| format_err!("symlink {link_src} -> {link_tgt} failed: {e}",))?; + + Ok(file) + } + + fn archive_path(&self, ts: Timestamp) -> String { + format!( + "{path}/archives/{file}", + path = self.log_path, + file = self.archive_file(ts) + ) + } + fn archive_file(&self, ts: Timestamp) -> String { + format!( + "{name}.{ts}.log", + name = self.log_name, + ts = ts.format(TS_FORMAT), + ) + } +} + +struct LogItem { + stream_name: &'static str, + ts: chrono::DateTime, + line: Vec, +} + +async fn copy( + stream_name: &'static str, + out: impl AsyncRead + Unpin, + tx: mpsc::UnboundedSender, +) { + let mut out = BufReader::new(out); + let buf_size = page_size::get(); + loop { + let mut line = Vec::with_capacity(buf_size); + if let Err(e) = out.read_until(b'\n', &mut line).await { + warn!("read {stream_name} failed: {e}"); + return; + } + if line.is_empty() { + break; + } + + let log = LogItem { + stream_name, + ts: chrono::Utc::now(), + line, + }; + if let Err(e) = tx.send(log) { + warn!("send line failed: {e}"); + return; + } + } +} + +fn ts_trunc(ts: Timestamp) -> Timestamp { + ts.duration_trunc(TRUNC_DELTA) + .expect("duration_trunc failed") +} + +async fn compress_archives(mut read_dir: fs::ReadDir, log_name: String, exclude_ts: String) { + loop { + let Ok(Some(entry)) = + (read_dir.next_entry().await).inspect_err(|e| error!("archive dir read failed: {e}")) + else { + return; + }; + + let file_name = entry.file_name(); + let Some(file_name) = file_name.to_str() else { + continue; + }; + let Some(name) = file_name.strip_suffix(".log") else { + continue; + }; + let Some((name, ts)) = name.rsplit_once('.') else { + continue; + }; + if name != log_name { + continue; + } + if ts == exclude_ts { + continue; + } + + let file_path = entry.path(); + let Some(file_path) = file_path.to_str() else { + continue; + }; + + compress(file_path).await; + } +} + +async fn compress(path: impl AsRef) { + let path = path.as_ref(); + + let result = async { + let path_str = path.to_string_lossy(); + debug!("compressing {path_str}"); + + let mut input = File::open(path) + .await + .map_err(|e| format_err!("open {path_str} failed: {e}"))?; + + let out_path = path.with_extension("zstd"); + let out = (File::create(&out_path).await) // create output + .map_err(|e| format_err!("create {} failed: {e}", out_path.to_string_lossy()))?; + + let mut out = ZstdEncoder::new(out); + async { + tokio::io::copy(&mut input, &mut out).await?; + out.flush().await + } + .await + .map_err(|e| format_err!("compression of {path_str} failed: {e}"))?; + + fs::remove_file(path) + .await + .map_err(|e| format_err!("remove {path_str} failed: {e}")) + } + .await; + + if let Err(e) = result { + warn!("{e}"); + } +} + +pub fn parse_ts(ts: &str) -> std::result::Result { + let dt = + chrono::NaiveDateTime::parse_from_str(&format!("{ts}0000"), &format!("{TS_FORMAT}%M%S"))?; + Ok(Timestamp::from_naive_utc_and_offset(dt, Utc)) +} + +pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result> { + let mut dir = PathBuf::from(log_path); + dir.push("archives"); + + let mut entries = Vec::new(); + let mut read_dir = fs::read_dir(dir).await?; + + while let Some(entry) = read_dir.next_entry().await? { + let file_name = entry.file_name(); + let Some(file_name) = file_name.to_str() else { + continue; + }; + + let (name, compressed) = file_name + .strip_suffix(".zstd") + .map_or((file_name, false), |s| (s, true)); + + let Some(name) = name.strip_suffix(".log") else { + continue; + }; + + let Some((name, timestamp)) = name.rsplit_once('.') else { + continue; + }; + if name != log_name { + continue; + }; + + let Ok(timestamp) = + parse_ts(timestamp).inspect_err(|e| debug!("invalid timestamp: {timestamp}: {e}")) + else { + continue; + }; + + entries.push(LogFile { + path: entry.path(), + compressed, + timestamp, + }); + } + + Ok(entries) +} + +#[derive(Debug, PartialEq, Eq, Ord)] +pub struct LogFile { + pub path: PathBuf, + pub compressed: bool, + pub timestamp: Timestamp, +} + +impl PartialOrd for LogFile { + fn partial_cmp(&self, other: &Self) -> Option { + self.timestamp.partial_cmp(&other.timestamp) + } +} + +impl LogFile { + pub async fn copy_to(&self, out: &mut (impl AsyncWrite + Unpin)) -> io::Result { + let mut input = File::open(&self.path).await?; + if self.compressed { + let mut out = ZstdDecoder::new(out); + tokio::io::copy(&mut input, &mut out).await + } else { + tokio::io::copy(&mut input, out).await + } + } +} diff --git a/test-dkl b/test-dkl new file mode 100755 index 0000000..67cf7f1 --- /dev/null +++ b/test-dkl @@ -0,0 +1,28 @@ +#! /bin/sh +set -ex + +dkl=target/debug/dkl + +test=${1:-log} + +export RUST_LOG=debug + +case $test in + apply-config) + $dkl apply-config -P tmp/test-system -F '*.crt' + ;; + + logger) + $dkl logger --log-path tmp/log --with-prefix -- bash -c 'echo stdout; echo 1>&2 stderr' + + tree tmp/log + cat tmp/log/bash.log + ;; + + log) + $dkl log --log-path tmp/log bash 20250720_12 20250720_16 + ;; + + *) echo 1>&2 "unknown test: $test"; exit 1 ;; +esac + diff --git a/test b/test-dls similarity index 100% rename from test rename to test-dls