2 Commits

Author SHA1 Message Date
Mikaël Cluseau d7d32efb43 fix nbspc 2026-04-14 11:05:34 +02:00
Mikaël Cluseau fe82ef4b17 logger: cleanup 2026-04-14 10:52:36 +02:00
4 changed files with 47 additions and 80 deletions
Generated
+1 -2
View File
@@ -334,7 +334,7 @@ dependencies = [
[[package]] [[package]]
name = "dkl" name = "dkl"
version = "1.1.0" version = "1.0.0"
dependencies = [ dependencies = [
"async-compression", "async-compression",
"base32", "base32",
@@ -353,7 +353,6 @@ dependencies = [
"human-units", "human-units",
"log", "log",
"lz4", "lz4",
"memchr",
"nix", "nix",
"openssl", "openssl",
"page_size", "page_size",
+1 -2
View File
@@ -1,6 +1,6 @@
[package] [package]
name = "dkl" name = "dkl"
version = "1.1.0" version = "1.0.0"
edition = "2024" edition = "2024"
[profile.release] [profile.release]
@@ -28,7 +28,6 @@ hex = "0.4.3"
human-units = "0.5.3" human-units = "0.5.3"
log = "0.4.27" log = "0.4.27"
lz4 = "1.28.1" lz4 = "1.28.1"
memchr = "2.8.0"
nix = { version = "0.31.2", features = ["process", "signal", "user"] } nix = { version = "0.31.2", features = ["process", "signal", "user"] }
openssl = "0.10.73" openssl = "0.10.73"
page_size = "0.6.0" page_size = "0.6.0"
-9
View File
@@ -176,15 +176,6 @@ impl Cgroup {
dir.push(name); dir.push(name);
Self::read(self.path.child(name), &dir).await Self::read(self.path.child(name), &dir).await
} }
pub async fn write_param(
&self,
name: impl AsRef<StdPath>,
value: impl AsRef<[u8]>,
) -> fs::Result<()> {
let cg_dir = PathBuf::from(self.path.as_ref());
fs::write(cg_dir.join(name), value).await
}
} }
impl PartialEq for Cgroup { impl PartialEq for Cgroup {
+35 -57
View File
@@ -51,8 +51,12 @@ impl<'t> Logger<'t> {
cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped()); cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped());
if let Some(cgroup) = cgroup.as_deref() { if let Some(cgroup) = cgroup.as_deref() {
let mut cg_path = PathBuf::from(cgroup::ROOT); let mut cg_path = PathBuf::from(cgroup::ROOT);
cg_path.push(cgroup);
cg_path.push(self.log_name); let mut parts = cgroup.split('/').chain([self.log_name].into_iter());
let mut part = parts.next().unwrap(); // 1 element guaranteed
loop {
cg_path.push(part);
use std::io::ErrorKind as K; use std::io::ErrorKind as K;
match tokio::fs::create_dir(&cg_path).await { match tokio::fs::create_dir(&cg_path).await {
@@ -60,7 +64,18 @@ impl<'t> Logger<'t> {
Err(e) if e.kind() == K::AlreadyExists => { Err(e) if e.kind() == K::AlreadyExists => {
debug!("existing dir {}", cg_path.display()) debug!("existing dir {}", cg_path.display())
} }
Err(e) => return Err(fs::Error::CreateDir(cg_path, e).into()), Err(e) => Err(e)?,
}
let Some(next_part) = parts.next() else {
break;
};
let control_file = &cg_path.join("cgroup.subtree_control");
debug!("control file {}", control_file.display());
tokio::fs::write(control_file, b"+cpu +memory +pids +io").await?;
part = next_part;
} }
let procs_file = cg_path.join("cgroup.procs"); let procs_file = cg_path.join("cgroup.procs");
@@ -70,7 +85,7 @@ impl<'t> Logger<'t> {
let mut child = cmd.spawn().map_err(|e| format_err!("exec failed: {e}"))?; let mut child = cmd.spawn().map_err(|e| format_err!("exec failed: {e}"))?;
let (tx, mut rx) = mpsc::channel(8); let (tx, mut rx) = mpsc::unbounded_channel();
tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone())); tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone()));
tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx)); tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx));
@@ -137,9 +152,6 @@ impl<'t> Logger<'t> {
} }
out.write_all(&log.line).await?; out.write_all(&log.line).await?;
if log.line.last() != Some(&b'\n') {
out.write("\n".as_bytes()).await?;
}
Ok(()) Ok(())
} }
@@ -219,66 +231,32 @@ struct LogItem {
line: Vec<u8>, line: Vec<u8>,
} }
async fn copy(stream_name: &'static str, out: impl AsyncRead + Unpin, tx: mpsc::Sender<LogItem>) { async fn copy(
stream_name: &'static str,
out: impl AsyncRead + Unpin,
tx: mpsc::UnboundedSender<LogItem>,
) {
let mut out = BufReader::new(out);
let buf_size = page_size::get(); let buf_size = page_size::get();
let mut out = BufReader::with_capacity(buf_size, out); loop {
let mut line = Vec::with_capacity(buf_size); let mut line = Vec::with_capacity(buf_size);
if let Err(e) = out.read_until(b'\n', &mut line).await {
warn!("read {stream_name} failed: {e}");
return;
}
if line.is_empty() {
break;
}
macro_rules! send_line {
() => {
let log = LogItem { let log = LogItem {
stream_name, stream_name,
ts: chrono::Utc::now(), ts: chrono::Utc::now(),
line: line.clone(), line,
}; };
if let Err(e) = tx.send(log).await { if let Err(e) = tx.send(log) {
warn!("send line failed: {e}"); warn!("send line failed: {e}");
return; return;
} }
line.clear();
};
}
loop {
let Ok(buf) = (out.fill_buf())
.await
.inspect_err(|e| warn!("read {stream_name} failed: {e}"))
else {
break;
};
if buf.is_empty() {
break;
}
let remaining = buf_size - line.len();
if let Some(pos) = memchr::memchr(b'\n', buf) {
let len = pos + 1;
let mut buf = &buf[..len];
if len > remaining {
line.extend_from_slice(&buf[..remaining]);
send_line!();
buf = &buf[remaining..];
}
line.extend_from_slice(buf);
out.consume(len);
send_line!();
} else if buf.len() > remaining {
line.extend_from_slice(&buf[..remaining]);
out.consume(remaining);
send_line!();
} else {
line.extend_from_slice(buf);
let len = buf.len();
out.consume(len);
}
}
if !line.is_empty() {
send_line!();
} }
} }