Compare commits
6 Commits
c75d4febb3
..
v1.1.0
| Author | SHA1 | Date | |
|---|---|---|---|
| 6059d81b3d | |||
| f3b3a9b9c7 | |||
| a5026b884d | |||
| c19798f9f0 | |||
| dc936f52ab | |||
| 33fcfbd197 |
Generated
+13
-1
@@ -334,7 +334,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "dkl"
|
||||
version = "1.0.0"
|
||||
version = "1.1.0"
|
||||
dependencies = [
|
||||
"async-compression",
|
||||
"base32",
|
||||
@@ -353,6 +353,7 @@ dependencies = [
|
||||
"human-units",
|
||||
"log",
|
||||
"lz4",
|
||||
"memchr",
|
||||
"nix",
|
||||
"openssl",
|
||||
"page_size",
|
||||
@@ -362,6 +363,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_yaml",
|
||||
"signal-hook",
|
||||
"tabled",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
@@ -1480,6 +1482,16 @@ version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b2a0c28ca5908dbdbcd52e6fdaa00358ab88637f8ab33e1f188dd510eb44b53d"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"signal-hook-registry",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.4.8"
|
||||
|
||||
+4
-2
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "dkl"
|
||||
version = "1.0.0"
|
||||
version = "1.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[profile.release]
|
||||
@@ -28,7 +28,8 @@ hex = "0.4.3"
|
||||
human-units = "0.5.3"
|
||||
log = "0.4.27"
|
||||
lz4 = "1.28.1"
|
||||
nix = { version = "0.31.2", features = ["user"] }
|
||||
memchr = "2.8.0"
|
||||
nix = { version = "0.31.2", features = ["process", "signal", "user"] }
|
||||
openssl = "0.10.73"
|
||||
page_size = "0.6.0"
|
||||
reqwest = { version = "0.13.1", features = ["json", "stream", "native-tls", "socks"], default-features = false }
|
||||
@@ -37,6 +38,7 @@ rust-argon2 = "3.0.0"
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = "1.0.140"
|
||||
serde_yaml = "0.9.34"
|
||||
signal-hook = "0.4.4"
|
||||
tabled = "0.20.0"
|
||||
thiserror = "2.0.12"
|
||||
tokio = { version = "1.45.1", features = ["fs", "io-std", "macros", "process", "rt"] }
|
||||
|
||||
+19
-3
@@ -3,6 +3,7 @@ use eyre::{format_err, Result};
|
||||
use human_units::Duration;
|
||||
use log::{debug, error};
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs;
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -38,6 +39,9 @@ enum Command {
|
||||
/// prefix log lines with time & stream
|
||||
#[arg(long)]
|
||||
with_prefix: bool,
|
||||
/// exec command in this cgroup
|
||||
#[arg(long)]
|
||||
cgroup: Option<String>,
|
||||
command: String,
|
||||
args: Vec<String>,
|
||||
},
|
||||
@@ -90,7 +94,14 @@ enum Command {
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum CgCmd {
|
||||
Ls,
|
||||
Ls {
|
||||
#[arg(long)]
|
||||
root: Option<PathBuf>,
|
||||
#[arg(long, short = 'X')]
|
||||
exclude: Vec<String>,
|
||||
#[arg(long, short = 'C')]
|
||||
cols: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
@@ -119,6 +130,7 @@ async fn main() -> Result<()> {
|
||||
ref log_path,
|
||||
ref log_name,
|
||||
with_prefix,
|
||||
cgroup,
|
||||
command,
|
||||
args,
|
||||
} => {
|
||||
@@ -130,7 +142,7 @@ async fn main() -> Result<()> {
|
||||
log_name,
|
||||
with_prefix,
|
||||
}
|
||||
.run(command, &args)
|
||||
.run(cgroup, command, &args)
|
||||
.await
|
||||
}
|
||||
C::Log {
|
||||
@@ -169,7 +181,11 @@ async fn main() -> Result<()> {
|
||||
.map(|_| ())?),
|
||||
|
||||
C::Cg { cmd } => match cmd {
|
||||
CgCmd::Ls => Ok(dkl::cgroup::ls().await?),
|
||||
CgCmd::Ls {
|
||||
root,
|
||||
exclude,
|
||||
cols,
|
||||
} => Ok(dkl::cgroup::ls(root, &exclude, cols.as_deref()).await?),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
+259
-83
@@ -1,29 +1,66 @@
|
||||
use log::warn;
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::Display;
|
||||
use std::io::Result;
|
||||
use std::path::{Path as StdPath, PathBuf};
|
||||
use std::rc::Rc;
|
||||
use std::str::FromStr;
|
||||
use tokio::fs;
|
||||
|
||||
use crate::human::Human;
|
||||
use crate::{fs, human::Human};
|
||||
|
||||
const CGROUP_ROOT: &str = "/sys/fs/cgroup/";
|
||||
pub const ROOT: &str = "/sys/fs/cgroup";
|
||||
|
||||
pub async fn ls() -> Result<()> {
|
||||
let mut cgs = walk(CGROUP_ROOT.to_string()).await;
|
||||
pub async fn ls(
|
||||
parent: Option<impl AsRef<StdPath>>,
|
||||
exclude: &[String],
|
||||
columns: Option<&str>,
|
||||
) -> fs::Result<()> {
|
||||
let mut root = PathBuf::from(ROOT);
|
||||
if let Some(parent) = parent {
|
||||
root = root.join(parent);
|
||||
}
|
||||
|
||||
cgs.sort_by(|a, b| a.path.cmp(&b.path));
|
||||
let cols: [(&str, fn(&Cgroup) -> String); _] = [
|
||||
("wset", |cg| cg.memory.working_set().human()),
|
||||
("anon", |cg| cg.memory.stat.anon.human()),
|
||||
("min", |cg| cg.memory.min.human()),
|
||||
("low", |cg| cg.memory.low.human()),
|
||||
("high", |cg| cg.memory.high.human()),
|
||||
("max", |cg| cg.memory.max.human()),
|
||||
];
|
||||
let cols = if let Some(columns) = columns {
|
||||
(cols.into_iter())
|
||||
.filter(|(n, _)| columns.split(',').any(|col| &col == n))
|
||||
.collect()
|
||||
} else {
|
||||
cols.to_vec()
|
||||
};
|
||||
|
||||
let mut table = tabled::builder::Builder::new();
|
||||
table.push_record(["cgroup", "workg set", "anon", "max"]);
|
||||
table.push_record(["cgroup"].into_iter().chain(cols.iter().map(|(n, _)| *n)));
|
||||
|
||||
for cg in cgs {
|
||||
let name = cg.path.strip_prefix(CGROUP_ROOT).unwrap();
|
||||
table.push_record([
|
||||
name,
|
||||
&cg.memory.working_set().human(),
|
||||
&cg.memory.stat.anon.human(),
|
||||
&cg.memory.max.human(),
|
||||
]);
|
||||
let mut todo = vec![(Cgroup::root(root).await?, vec![], true)];
|
||||
while let Some((cg, p_lasts, last)) = todo.pop() {
|
||||
let mut name = String::new();
|
||||
for last in p_lasts.iter().skip(1) {
|
||||
name.push_str(if *last { " " } else { "| " });
|
||||
}
|
||||
if !p_lasts.is_empty() {
|
||||
name.push_str(if last { "`- " } else { "|- " });
|
||||
}
|
||||
name.push_str(&cg.name());
|
||||
|
||||
table.push_record([name].into_iter().chain(cols.iter().map(|(_, f)| f(&cg))));
|
||||
|
||||
let mut p_lasts = p_lasts.clone();
|
||||
p_lasts.push(last);
|
||||
|
||||
let mut children = cg.read_children().await?;
|
||||
children.sort();
|
||||
todo.extend(
|
||||
(children.into_iter().rev())
|
||||
.filter(|c| !exclude.iter().any(|x| x == &c.path.full_name()))
|
||||
.enumerate()
|
||||
.map(|(i, child)| (child, p_lasts.clone(), i == 0)),
|
||||
);
|
||||
}
|
||||
|
||||
use tabled::settings::{
|
||||
@@ -41,27 +78,207 @@ pub async fn ls() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn walk(root: String) -> Vec<Cgroup> {
|
||||
let mut todo = vec![root];
|
||||
|
||||
let mut results = Vec::new();
|
||||
|
||||
while let Some(path) = todo.pop() {
|
||||
match read(&path, |d| todo.push(d)).await {
|
||||
Ok(cg) => results.push(cg),
|
||||
Err(e) => {
|
||||
warn!("reading dir {path} failed: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
results
|
||||
pub struct Cgroup {
|
||||
path: Rc<Path>,
|
||||
children: Vec<PathBuf>,
|
||||
memory: Memory,
|
||||
}
|
||||
|
||||
struct Cgroup {
|
||||
path: String,
|
||||
memory: Memory,
|
||||
impl Cgroup {
|
||||
pub async fn root(path: impl AsRef<StdPath>) -> fs::Result<Self> {
|
||||
let path = path.as_ref();
|
||||
Self::read(Path::root(path), path).await
|
||||
}
|
||||
|
||||
async fn read(cg_path: Rc<Path>, path: impl AsRef<StdPath>) -> fs::Result<Self> {
|
||||
let path = path.as_ref();
|
||||
|
||||
use fs::Error as E;
|
||||
|
||||
let mut rd = fs::read_dir(path).await?;
|
||||
|
||||
let mut cg = Self {
|
||||
path: cg_path,
|
||||
children: Vec::new(),
|
||||
memory: Memory::default(),
|
||||
};
|
||||
|
||||
while let Some(entry) = (rd.next_entry().await).map_err(|e| E::ReadDir(path.into(), e))? {
|
||||
let path = entry.path();
|
||||
|
||||
let Some(file_name) = path.file_name() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if (entry.file_type().await)
|
||||
.map_err(|e| E::Stat(path.clone(), e))?
|
||||
.is_dir()
|
||||
{
|
||||
cg.children.push(file_name.into());
|
||||
continue;
|
||||
}
|
||||
|
||||
let file_name = file_name.as_encoded_bytes();
|
||||
let Some(idx) = file_name.iter().position(|b| *b == b'.') else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let (controller, param) = file_name.split_at(idx);
|
||||
let param = ¶m[1..];
|
||||
|
||||
match controller {
|
||||
b"memory" => match param {
|
||||
b"current" => cg.memory.current = read_parse(path).await?,
|
||||
b"low" => cg.memory.low = read_parse(path).await?,
|
||||
b"high" => cg.memory.high = read_parse(path).await?,
|
||||
b"min" => cg.memory.min = read_parse(path).await?,
|
||||
b"max" => cg.memory.max = read_parse(path).await?,
|
||||
b"stat" => cg.memory.stat.read_from(path).await?,
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(cg)
|
||||
}
|
||||
|
||||
async fn read_children(&self) -> fs::Result<Vec<Self>> {
|
||||
let mut r = Vec::with_capacity(self.children.len());
|
||||
|
||||
let mut dir = PathBuf::from(self.path.as_ref());
|
||||
|
||||
for child_name in &self.children {
|
||||
dir.push(child_name);
|
||||
let child_path = Path::Child(self.path.clone(), child_name.into());
|
||||
r.push(Self::read(child_path.into(), &dir).await?);
|
||||
dir.pop();
|
||||
}
|
||||
|
||||
Ok(r)
|
||||
}
|
||||
|
||||
pub fn name(&self) -> Cow<'_, str> {
|
||||
self.path.name()
|
||||
}
|
||||
|
||||
pub fn full_name(&self) -> String {
|
||||
self.path.full_name()
|
||||
}
|
||||
|
||||
pub fn children(&self) -> impl Iterator<Item = &StdPath> {
|
||||
self.children.iter().map(|n| n.as_path())
|
||||
}
|
||||
|
||||
pub async fn read_child(&self, name: impl AsRef<StdPath>) -> fs::Result<Self> {
|
||||
let name = name.as_ref();
|
||||
let mut dir = PathBuf::from(self.path.as_ref());
|
||||
dir.push(name);
|
||||
Self::read(self.path.child(name), &dir).await
|
||||
}
|
||||
|
||||
pub async fn write_param(
|
||||
&self,
|
||||
name: impl AsRef<StdPath>,
|
||||
value: impl AsRef<[u8]>,
|
||||
) -> fs::Result<()> {
|
||||
let cg_dir = PathBuf::from(self.path.as_ref());
|
||||
fs::write(cg_dir.join(name), value).await
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for Cgroup {
|
||||
fn eq(&self, o: &Self) -> bool {
|
||||
&self.path == &o.path
|
||||
}
|
||||
}
|
||||
impl Eq for Cgroup {}
|
||||
|
||||
impl Ord for Cgroup {
|
||||
fn cmp(&self, o: &Self) -> std::cmp::Ordering {
|
||||
self.path.cmp(&o.path)
|
||||
}
|
||||
}
|
||||
impl PartialOrd for Cgroup {
|
||||
fn partial_cmp(&self, o: &Self) -> Option<std::cmp::Ordering> {
|
||||
Some(self.cmp(o))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||
enum Path {
|
||||
Root(PathBuf),
|
||||
Child(Rc<Path>, PathBuf),
|
||||
}
|
||||
|
||||
impl Path {
|
||||
fn name(&self) -> Cow<'_, str> {
|
||||
match self {
|
||||
Self::Root(_) => "/".into(),
|
||||
Self::Child(_, n) => n.to_string_lossy(),
|
||||
}
|
||||
}
|
||||
|
||||
fn full_name(&self) -> String {
|
||||
use Path::*;
|
||||
match self {
|
||||
Root(_) => "/".into(),
|
||||
Child(parent, _) => match parent.as_ref() {
|
||||
Root(_) => self.name().into(),
|
||||
Child(_, _) => format!("{}/{}", parent.full_name(), self.name()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn depth(&self) -> usize {
|
||||
use Path::*;
|
||||
match self {
|
||||
Root(_) => 0,
|
||||
Child(p, _) => 1 + p.depth(),
|
||||
}
|
||||
}
|
||||
|
||||
fn root(dir: impl Into<PathBuf>) -> Rc<Self> {
|
||||
Rc::new(Self::Root(dir.into()))
|
||||
}
|
||||
|
||||
fn child(self: &Rc<Self>, name: impl Into<PathBuf>) -> Rc<Self> {
|
||||
Rc::new(Self::Child(self.clone(), name.into()))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Path> for PathBuf {
|
||||
fn from(mut p: &Path) -> Self {
|
||||
let mut stack = Vec::with_capacity(p.depth() + 1);
|
||||
loop {
|
||||
match p {
|
||||
Path::Root(root_path) => {
|
||||
stack.push(root_path);
|
||||
break;
|
||||
}
|
||||
Path::Child(parent, n) => {
|
||||
stack.push(n);
|
||||
p = parent;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let len = stack.iter().map(|p| p.as_os_str().len() + 1).sum::<usize>() - 1;
|
||||
|
||||
let mut buf = PathBuf::with_capacity(len);
|
||||
buf.extend(stack.into_iter().rev());
|
||||
buf
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_path_to_pathbuf() {
|
||||
let root = Path::root("/a/b");
|
||||
let c1 = root.child("c1");
|
||||
let c1_1 = c1.child("c1-1");
|
||||
|
||||
assert_eq!(PathBuf::from("/a/b/c1"), PathBuf::from(c1.as_ref()));
|
||||
assert_eq!(PathBuf::from("/a/b/c1/c1-1"), PathBuf::from(c1_1.as_ref()));
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -76,7 +293,7 @@ struct Memory {
|
||||
|
||||
impl Memory {
|
||||
/// working set as defined by cAdvisor
|
||||
/// (https://github.com/google/cadvisor/blob/master/container/libcontainer/handler.go#L853-L862)
|
||||
/// (https://github.com/google/cadvisor/blob/e1ccfa9b4cf2e17d74e0f5526b6487b74b704503/container/libcontainer/handler.go#L853-L862)
|
||||
fn working_set(&self) -> Option<u64> {
|
||||
let cur = self.current?;
|
||||
let inactive = self.stat.inactive_file?;
|
||||
@@ -128,62 +345,21 @@ impl FromStr for Max {
|
||||
}
|
||||
}
|
||||
|
||||
async fn read(dir: &str, mut f: impl FnMut(String)) -> Result<Cgroup> {
|
||||
let mut rd = fs::read_dir(dir).await?;
|
||||
|
||||
let mut cg = Cgroup {
|
||||
path: dir.to_string(),
|
||||
memory: Memory::default(),
|
||||
};
|
||||
|
||||
while let Some(entry) = rd.next_entry().await? {
|
||||
let path = entry.path();
|
||||
let Some(path) = path.to_str() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if entry.file_type().await?.is_dir() {
|
||||
f(path.to_string());
|
||||
continue;
|
||||
}
|
||||
|
||||
let Some((_, name)) = path.rsplit_once('/') else {
|
||||
continue;
|
||||
};
|
||||
let Some((controller, param)) = name.split_once('.') else {
|
||||
continue;
|
||||
};
|
||||
|
||||
match controller {
|
||||
"memory" => match param {
|
||||
"current" => cg.memory.current = read_parse(path).await?,
|
||||
"low" => cg.memory.low = read_parse(path).await?,
|
||||
"high" => cg.memory.high = read_parse(path).await?,
|
||||
"min" => cg.memory.min = read_parse(path).await?,
|
||||
"max" => cg.memory.max = read_parse(path).await?,
|
||||
"stat" => cg.memory.stat.read_from(path).await?,
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(cg)
|
||||
}
|
||||
|
||||
async fn read_parse<T: FromStr>(path: &str) -> Result<Option<T>>
|
||||
async fn read_parse<T: FromStr>(path: impl AsRef<StdPath>) -> fs::Result<Option<T>>
|
||||
where
|
||||
T::Err: Display,
|
||||
{
|
||||
let path = path.as_ref();
|
||||
|
||||
(fs::read_to_string(path).await?)
|
||||
.trim_ascii()
|
||||
.parse()
|
||||
.map_err(|e| std::io::Error::other(format!("{path}: parse failed: {e}")))
|
||||
.map_err(|e| fs::Error::Other(path.into(), format!("parse failed: {e}")))
|
||||
.map(|v| Some(v))
|
||||
}
|
||||
|
||||
impl MemoryStat {
|
||||
async fn read_from(&mut self, path: &str) -> Result<()> {
|
||||
async fn read_from(&mut self, path: impl AsRef<StdPath>) -> fs::Result<()> {
|
||||
for line in (fs::read_to_string(path).await?).lines() {
|
||||
let Some((key, value)) = line.split_once(' ') else {
|
||||
continue;
|
||||
|
||||
@@ -1,9 +1,53 @@
|
||||
use eyre::Result;
|
||||
use std::fs::Metadata;
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs::read_dir;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::fs;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
pub use tokio::fs::ReadDir;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("{0}: read dir: {1}")]
|
||||
ReadDir(PathBuf, std::io::Error),
|
||||
#[error("{0}: exists: {1}")]
|
||||
Exists(PathBuf, std::io::Error),
|
||||
#[error("{0}: read: {1}")]
|
||||
Read(PathBuf, std::io::Error),
|
||||
#[error("{0}: stat: {1}")]
|
||||
Stat(PathBuf, std::io::Error),
|
||||
#[error("{0}: create dir: {1}")]
|
||||
CreateDir(PathBuf, std::io::Error),
|
||||
#[error("{0}: write: {1}")]
|
||||
Write(PathBuf, std::io::Error),
|
||||
#[error("{0}: remove file: {1}")]
|
||||
RemoveFile(PathBuf, std::io::Error),
|
||||
#[error("{0}: symlink: {1}")]
|
||||
Symlink(PathBuf, std::io::Error),
|
||||
#[error("{0}: {1}")]
|
||||
Other(PathBuf, String),
|
||||
}
|
||||
|
||||
macro_rules! wrap_path {
|
||||
($fn:ident $( ( $( $pname:ident : $ptype:ty ),* ) )? -> $result:ty, $err:ident) => {
|
||||
pub async fn $fn(path: impl AsRef<Path>$($(, $pname: $ptype)*)?) -> Result<$result> {
|
||||
let path = path.as_ref();
|
||||
fs::$fn(path $($(, $pname)*)?).await.map_err(|e| Error::$err(path.into(), e))
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
wrap_path!(read_dir -> ReadDir, ReadDir);
|
||||
wrap_path!(try_exists -> bool, Exists);
|
||||
wrap_path!(read -> Vec<u8>, Read);
|
||||
wrap_path!(read_to_string -> String, Read);
|
||||
wrap_path!(create_dir -> (), CreateDir);
|
||||
wrap_path!(create_dir_all -> (), CreateDir);
|
||||
wrap_path!(remove_file -> (), RemoveFile);
|
||||
wrap_path!(symlink(link_src: impl AsRef<Path>) -> (), Symlink);
|
||||
wrap_path!(write(content: impl AsRef<[u8]>) -> (), Write);
|
||||
|
||||
pub fn spawn_walk_dir(
|
||||
dir: impl Into<PathBuf> + Send + 'static,
|
||||
) -> mpsc::Receiver<Result<(PathBuf, Metadata)>> {
|
||||
@@ -24,7 +68,7 @@ pub async fn walk_dir(dir: impl Into<PathBuf>, tx: mpsc::Sender<Result<(PathBuf,
|
||||
let entry = match rd.next_entry().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
if tx.send(Err(e.into())).await.is_err() {
|
||||
if tx.send(Err(Error::ReadDir(dir.clone(), e))).await.is_err() {
|
||||
return;
|
||||
}
|
||||
todo.pop_front(); // skip dir on error
|
||||
|
||||
+5
-5
@@ -1,13 +1,13 @@
|
||||
pub mod apply;
|
||||
pub mod rc;
|
||||
pub mod cgroup;
|
||||
pub mod human;
|
||||
pub mod bootstrap;
|
||||
pub mod cgroup;
|
||||
pub mod dls;
|
||||
pub mod dynlay;
|
||||
pub mod fs;
|
||||
pub mod human;
|
||||
pub mod logger;
|
||||
pub mod proxy;
|
||||
pub mod rc;
|
||||
|
||||
#[derive(Debug, Default, serde::Deserialize, serde::Serialize)]
|
||||
pub struct Config {
|
||||
@@ -86,11 +86,11 @@ impl Config {
|
||||
}
|
||||
|
||||
pub fn base64_decode(s: &str) -> Result<Vec<u8>, base64::DecodeError> {
|
||||
use base64::{prelude::BASE64_STANDARD_NO_PAD as B64, Engine as _};
|
||||
use base64::{Engine as _, prelude::BASE64_STANDARD_NO_PAD as B64};
|
||||
B64.decode(s.trim_end_matches('='))
|
||||
}
|
||||
|
||||
pub fn base64_encode(b: &[u8]) -> String {
|
||||
use base64::{prelude::BASE64_STANDARD as B64, Engine as _};
|
||||
use base64::{Engine as _, prelude::BASE64_STANDARD as B64};
|
||||
B64.encode(b)
|
||||
}
|
||||
|
||||
+161
-58
@@ -1,17 +1,19 @@
|
||||
use async_compression::tokio::write::{ZstdDecoder, ZstdEncoder};
|
||||
use chrono::{DurationRound, TimeDelta, Utc};
|
||||
use eyre::{Result, format_err};
|
||||
use eyre::{format_err, Result};
|
||||
use log::{debug, error, warn};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::Stdio;
|
||||
use tokio::{
|
||||
fs::{self, File},
|
||||
fs::File,
|
||||
io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter},
|
||||
process,
|
||||
sync::mpsc,
|
||||
time::{Duration, sleep},
|
||||
time::{sleep, Duration},
|
||||
};
|
||||
|
||||
use crate::{cgroup, fs};
|
||||
|
||||
pub type Timestamp = chrono::DateTime<Utc>;
|
||||
|
||||
const TS_FORMAT: &str = "%Y%m%d_%H";
|
||||
@@ -27,7 +29,7 @@ pub struct Logger<'t> {
|
||||
}
|
||||
|
||||
impl<'t> Logger<'t> {
|
||||
pub async fn run(&self, command: &str, args: &[String]) -> Result<()> {
|
||||
pub async fn run(&self, cgroup: Option<String>, command: &str, args: &[String]) -> Result<()> {
|
||||
// make sure we can at least open the log before starting the command
|
||||
let archives_path = &format!("{path}/archives", path = self.log_path);
|
||||
(fs::create_dir_all(archives_path).await)
|
||||
@@ -45,20 +47,42 @@ impl<'t> Logger<'t> {
|
||||
));
|
||||
|
||||
// start the command
|
||||
let mut child = process::Command::new(command)
|
||||
.args(args)
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()?;
|
||||
let mut cmd = process::Command::new(command);
|
||||
cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped());
|
||||
if let Some(cgroup) = cgroup.as_deref() {
|
||||
let mut cg_path = PathBuf::from(cgroup::ROOT);
|
||||
cg_path.push(cgroup);
|
||||
cg_path.push(self.log_name);
|
||||
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
use std::io::ErrorKind as K;
|
||||
match tokio::fs::create_dir(&cg_path).await {
|
||||
Ok(_) => debug!("created dir {}", cg_path.display()),
|
||||
Err(e) if e.kind() == K::AlreadyExists => {
|
||||
debug!("existing dir {}", cg_path.display())
|
||||
}
|
||||
Err(e) => return Err(fs::Error::CreateDir(cg_path, e).into()),
|
||||
}
|
||||
|
||||
let procs_file = cg_path.join("cgroup.procs");
|
||||
debug!("procs file {}", procs_file.display());
|
||||
fs::write(&procs_file, b"0").await?;
|
||||
}
|
||||
|
||||
let mut child = cmd.spawn().map_err(|e| format_err!("exec failed: {e}"))?;
|
||||
|
||||
let (tx, mut rx) = mpsc::channel(8);
|
||||
|
||||
tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone()));
|
||||
tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx));
|
||||
|
||||
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
|
||||
// forward signals
|
||||
if let Some(child_pid) = child.id() {
|
||||
forward_signals_to(child_pid as i32);
|
||||
}
|
||||
|
||||
// handle output
|
||||
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
|
||||
|
||||
loop {
|
||||
tokio::select!(
|
||||
r = rx.recv() => {
|
||||
@@ -78,13 +102,14 @@ impl<'t> Logger<'t> {
|
||||
);
|
||||
}
|
||||
|
||||
let status = child.wait().await?;
|
||||
|
||||
// finalize
|
||||
while let Err(e) = current_log.flush().await {
|
||||
error!("final log flush failed: {e}");
|
||||
sleep(WRITE_RETRY_DELAY).await;
|
||||
}
|
||||
|
||||
let status = child.wait().await?;
|
||||
std::process::exit(status.code().unwrap_or(-1));
|
||||
}
|
||||
|
||||
@@ -112,6 +137,9 @@ impl<'t> Logger<'t> {
|
||||
}
|
||||
|
||||
out.write_all(&log.line).await?;
|
||||
if log.line.last() != Some(&b'\n') {
|
||||
out.write("↵\n".as_bytes()).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -125,29 +153,28 @@ impl<'t> Logger<'t> {
|
||||
.open(log_file)
|
||||
.await?;
|
||||
|
||||
let link_src = &format!(
|
||||
"{path}/{name}.log",
|
||||
path = self.log_path,
|
||||
name = self.log_name
|
||||
);
|
||||
let link_tgt = log_file
|
||||
.strip_prefix(&format!("{}/", self.log_path))
|
||||
.unwrap_or(log_file);
|
||||
let link_src = &PathBuf::from(self.log_path)
|
||||
.join(self.log_name)
|
||||
.with_added_extension("log");
|
||||
let link_tgt = &self.archive_rel_path(ts);
|
||||
|
||||
let _ = fs::remove_file(link_src).await;
|
||||
fs::symlink(link_tgt, link_src)
|
||||
.await
|
||||
.map_err(|e| format_err!("symlink {link_src} -> {link_tgt} failed: {e}",))?;
|
||||
fs::symlink(link_tgt, link_src).await.map_err(|e| {
|
||||
format_err!(
|
||||
"symlink {s} -> {t} failed: {e}",
|
||||
s = link_src.display(),
|
||||
t = link_tgt.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
fn archive_path(&self, ts: Timestamp) -> String {
|
||||
format!(
|
||||
"{path}/archives/{file}",
|
||||
path = self.log_path,
|
||||
file = self.archive_file(ts)
|
||||
)
|
||||
fn archive_path(&self, ts: Timestamp) -> PathBuf {
|
||||
PathBuf::from(self.log_path).join(self.archive_rel_path(ts))
|
||||
}
|
||||
fn archive_rel_path(&self, ts: Timestamp) -> PathBuf {
|
||||
PathBuf::from("archives").join(self.archive_file(ts))
|
||||
}
|
||||
fn archive_file(&self, ts: Timestamp) -> String {
|
||||
format!(
|
||||
@@ -158,39 +185,101 @@ impl<'t> Logger<'t> {
|
||||
}
|
||||
}
|
||||
|
||||
fn forward_signals_to(pid: i32) {
|
||||
use nix::{
|
||||
sys::signal::{kill, Signal},
|
||||
unistd::Pid,
|
||||
};
|
||||
use signal_hook::{consts::*, low_level::register};
|
||||
|
||||
debug!("forwarding signals to pid {pid}");
|
||||
|
||||
let pid = Pid::from_raw(pid);
|
||||
let signals = [
|
||||
SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGUSR1, SIGUSR2, SIGPIPE, SIGALRM,
|
||||
];
|
||||
|
||||
for sig in signals {
|
||||
let Ok(signal) = Signal::try_from(sig) else {
|
||||
continue;
|
||||
};
|
||||
unsafe {
|
||||
register(sig, move || {
|
||||
debug!("forwarding {signal} to {pid}");
|
||||
let _ = kill(pid, signal);
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct LogItem {
|
||||
stream_name: &'static str,
|
||||
ts: chrono::DateTime<chrono::Utc>,
|
||||
line: Vec<u8>,
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
stream_name: &'static str,
|
||||
out: impl AsyncRead + Unpin,
|
||||
tx: mpsc::UnboundedSender<LogItem>,
|
||||
) {
|
||||
let mut out = BufReader::new(out);
|
||||
async fn copy(stream_name: &'static str, out: impl AsyncRead + Unpin, tx: mpsc::Sender<LogItem>) {
|
||||
let buf_size = page_size::get();
|
||||
let mut out = BufReader::with_capacity(buf_size, out);
|
||||
let mut line = Vec::with_capacity(buf_size);
|
||||
|
||||
macro_rules! send_line {
|
||||
() => {
|
||||
let log = LogItem {
|
||||
stream_name,
|
||||
ts: chrono::Utc::now(),
|
||||
line: line.clone(),
|
||||
};
|
||||
if let Err(e) = tx.send(log).await {
|
||||
warn!("send line failed: {e}");
|
||||
return;
|
||||
}
|
||||
line.clear();
|
||||
};
|
||||
}
|
||||
|
||||
loop {
|
||||
let mut line = Vec::with_capacity(buf_size);
|
||||
if let Err(e) = out.read_until(b'\n', &mut line).await {
|
||||
warn!("read {stream_name} failed: {e}");
|
||||
return;
|
||||
}
|
||||
if line.is_empty() {
|
||||
let Ok(buf) = (out.fill_buf())
|
||||
.await
|
||||
.inspect_err(|e| warn!("read {stream_name} failed: {e}"))
|
||||
else {
|
||||
break;
|
||||
};
|
||||
|
||||
if buf.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
let log = LogItem {
|
||||
stream_name,
|
||||
ts: chrono::Utc::now(),
|
||||
line,
|
||||
};
|
||||
if let Err(e) = tx.send(log) {
|
||||
warn!("send line failed: {e}");
|
||||
return;
|
||||
let remaining = buf_size - line.len();
|
||||
|
||||
if let Some(pos) = memchr::memchr(b'\n', buf) {
|
||||
let len = pos + 1;
|
||||
let mut buf = &buf[..len];
|
||||
|
||||
if len > remaining {
|
||||
line.extend_from_slice(&buf[..remaining]);
|
||||
send_line!();
|
||||
buf = &buf[remaining..];
|
||||
}
|
||||
|
||||
line.extend_from_slice(buf);
|
||||
out.consume(len);
|
||||
send_line!();
|
||||
} else if buf.len() > remaining {
|
||||
line.extend_from_slice(&buf[..remaining]);
|
||||
out.consume(remaining);
|
||||
send_line!();
|
||||
} else {
|
||||
line.extend_from_slice(buf);
|
||||
let len = buf.len();
|
||||
out.consume(len);
|
||||
}
|
||||
}
|
||||
|
||||
if !line.is_empty() {
|
||||
send_line!();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn trunc_ts(ts: Timestamp) -> Timestamp {
|
||||
@@ -250,14 +339,14 @@ async fn compress(path: impl AsRef<Path>) {
|
||||
let mut out = ZstdEncoder::new(out);
|
||||
async {
|
||||
tokio::io::copy(&mut input, &mut out).await?;
|
||||
out.flush().await
|
||||
out.shutdown().await
|
||||
}
|
||||
.await
|
||||
.map_err(|e| format_err!("compression of {path_str} failed: {e}"))?;
|
||||
|
||||
fs::remove_file(path)
|
||||
.await
|
||||
.map_err(|e| format_err!("remove {path_str} failed: {e}"))
|
||||
.map_err(|e| format_err!("remove {path_str} failed: {e}"))
|
||||
}
|
||||
.await;
|
||||
|
||||
@@ -267,19 +356,22 @@ async fn compress(path: impl AsRef<Path>) {
|
||||
}
|
||||
|
||||
pub fn parse_ts(ts: &str) -> std::result::Result<Timestamp, chrono::ParseError> {
|
||||
let dt =
|
||||
chrono::NaiveDateTime::parse_from_str(&format!("{ts}0000"), &format!("{TS_FORMAT}%M%S"))?;
|
||||
let format = &format!("{TS_FORMAT}%M%S");
|
||||
let full_ts = &format!("{ts}0000");
|
||||
let dt = chrono::NaiveDateTime::parse_from_str(full_ts, format)?;
|
||||
Ok(Timestamp::from_naive_utc_and_offset(dt, Utc))
|
||||
}
|
||||
|
||||
pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result<Vec<LogFile>> {
|
||||
pub async fn log_files(log_path: &str, log_name: &str) -> fs::Result<Vec<LogFile>> {
|
||||
let mut dir = PathBuf::from(log_path);
|
||||
dir.push("archives");
|
||||
|
||||
let mut entries = Vec::new();
|
||||
let mut read_dir = fs::read_dir(dir).await?;
|
||||
let mut read_dir = fs::read_dir(&dir).await?;
|
||||
|
||||
while let Some(entry) = read_dir.next_entry().await? {
|
||||
while let Some(entry) =
|
||||
(read_dir.next_entry().await).map_err(|e| fs::Error::ReadDir(dir.clone(), e))?
|
||||
{
|
||||
let file_name = entry.file_name();
|
||||
let Some(file_name) = file_name.to_str() else {
|
||||
continue;
|
||||
@@ -318,16 +410,27 @@ pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result<Vec<Lo
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Ord)]
|
||||
#[derive(Debug)]
|
||||
pub struct LogFile {
|
||||
pub path: PathBuf,
|
||||
pub compressed: bool,
|
||||
pub timestamp: Timestamp,
|
||||
}
|
||||
|
||||
impl Eq for LogFile {}
|
||||
impl PartialEq for LogFile {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.timestamp == other.timestamp
|
||||
}
|
||||
}
|
||||
impl Ord for LogFile {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
self.timestamp.cmp(&other.timestamp)
|
||||
}
|
||||
}
|
||||
impl PartialOrd for LogFile {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
self.timestamp.partial_cmp(&other.timestamp)
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user