Compare commits
2 Commits
v1.1.0
..
d7d32efb43
| Author | SHA1 | Date | |
|---|---|---|---|
| d7d32efb43 | |||
| fe82ef4b17 |
Generated
+1
-2
@@ -334,7 +334,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "dkl"
|
||||
version = "1.1.0"
|
||||
version = "1.0.0"
|
||||
dependencies = [
|
||||
"async-compression",
|
||||
"base32",
|
||||
@@ -353,7 +353,6 @@ dependencies = [
|
||||
"human-units",
|
||||
"log",
|
||||
"lz4",
|
||||
"memchr",
|
||||
"nix",
|
||||
"openssl",
|
||||
"page_size",
|
||||
|
||||
+1
-2
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "dkl"
|
||||
version = "1.1.0"
|
||||
version = "1.0.0"
|
||||
edition = "2024"
|
||||
|
||||
[profile.release]
|
||||
@@ -28,7 +28,6 @@ hex = "0.4.3"
|
||||
human-units = "0.5.3"
|
||||
log = "0.4.27"
|
||||
lz4 = "1.28.1"
|
||||
memchr = "2.8.0"
|
||||
nix = { version = "0.31.2", features = ["process", "signal", "user"] }
|
||||
openssl = "0.10.73"
|
||||
page_size = "0.6.0"
|
||||
|
||||
@@ -176,15 +176,6 @@ impl Cgroup {
|
||||
dir.push(name);
|
||||
Self::read(self.path.child(name), &dir).await
|
||||
}
|
||||
|
||||
pub async fn write_param(
|
||||
&self,
|
||||
name: impl AsRef<StdPath>,
|
||||
value: impl AsRef<[u8]>,
|
||||
) -> fs::Result<()> {
|
||||
let cg_dir = PathBuf::from(self.path.as_ref());
|
||||
fs::write(cg_dir.join(name), value).await
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for Cgroup {
|
||||
|
||||
+45
-67
@@ -51,16 +51,31 @@ impl<'t> Logger<'t> {
|
||||
cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped());
|
||||
if let Some(cgroup) = cgroup.as_deref() {
|
||||
let mut cg_path = PathBuf::from(cgroup::ROOT);
|
||||
cg_path.push(cgroup);
|
||||
cg_path.push(self.log_name);
|
||||
|
||||
use std::io::ErrorKind as K;
|
||||
match tokio::fs::create_dir(&cg_path).await {
|
||||
Ok(_) => debug!("created dir {}", cg_path.display()),
|
||||
Err(e) if e.kind() == K::AlreadyExists => {
|
||||
debug!("existing dir {}", cg_path.display())
|
||||
let mut parts = cgroup.split('/').chain([self.log_name].into_iter());
|
||||
let mut part = parts.next().unwrap(); // 1 element guaranteed
|
||||
|
||||
loop {
|
||||
cg_path.push(part);
|
||||
|
||||
use std::io::ErrorKind as K;
|
||||
match tokio::fs::create_dir(&cg_path).await {
|
||||
Ok(_) => debug!("created dir {}", cg_path.display()),
|
||||
Err(e) if e.kind() == K::AlreadyExists => {
|
||||
debug!("existing dir {}", cg_path.display())
|
||||
}
|
||||
Err(e) => Err(e)?,
|
||||
}
|
||||
Err(e) => return Err(fs::Error::CreateDir(cg_path, e).into()),
|
||||
|
||||
let Some(next_part) = parts.next() else {
|
||||
break;
|
||||
};
|
||||
|
||||
let control_file = &cg_path.join("cgroup.subtree_control");
|
||||
debug!("control file {}", control_file.display());
|
||||
tokio::fs::write(control_file, b"+cpu +memory +pids +io").await?;
|
||||
|
||||
part = next_part;
|
||||
}
|
||||
|
||||
let procs_file = cg_path.join("cgroup.procs");
|
||||
@@ -70,7 +85,7 @@ impl<'t> Logger<'t> {
|
||||
|
||||
let mut child = cmd.spawn().map_err(|e| format_err!("exec failed: {e}"))?;
|
||||
|
||||
let (tx, mut rx) = mpsc::channel(8);
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
|
||||
tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone()));
|
||||
tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx));
|
||||
@@ -137,9 +152,6 @@ impl<'t> Logger<'t> {
|
||||
}
|
||||
|
||||
out.write_all(&log.line).await?;
|
||||
if log.line.last() != Some(&b'\n') {
|
||||
out.write("↵\n".as_bytes()).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -219,66 +231,32 @@ struct LogItem {
|
||||
line: Vec<u8>,
|
||||
}
|
||||
|
||||
async fn copy(stream_name: &'static str, out: impl AsyncRead + Unpin, tx: mpsc::Sender<LogItem>) {
|
||||
async fn copy(
|
||||
stream_name: &'static str,
|
||||
out: impl AsyncRead + Unpin,
|
||||
tx: mpsc::UnboundedSender<LogItem>,
|
||||
) {
|
||||
let mut out = BufReader::new(out);
|
||||
let buf_size = page_size::get();
|
||||
let mut out = BufReader::with_capacity(buf_size, out);
|
||||
let mut line = Vec::with_capacity(buf_size);
|
||||
|
||||
macro_rules! send_line {
|
||||
() => {
|
||||
let log = LogItem {
|
||||
stream_name,
|
||||
ts: chrono::Utc::now(),
|
||||
line: line.clone(),
|
||||
};
|
||||
if let Err(e) = tx.send(log).await {
|
||||
warn!("send line failed: {e}");
|
||||
return;
|
||||
}
|
||||
line.clear();
|
||||
};
|
||||
}
|
||||
|
||||
loop {
|
||||
let Ok(buf) = (out.fill_buf())
|
||||
.await
|
||||
.inspect_err(|e| warn!("read {stream_name} failed: {e}"))
|
||||
else {
|
||||
let mut line = Vec::with_capacity(buf_size);
|
||||
if let Err(e) = out.read_until(b'\n', &mut line).await {
|
||||
warn!("read {stream_name} failed: {e}");
|
||||
return;
|
||||
}
|
||||
if line.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
let log = LogItem {
|
||||
stream_name,
|
||||
ts: chrono::Utc::now(),
|
||||
line,
|
||||
};
|
||||
|
||||
if buf.is_empty() {
|
||||
break;
|
||||
if let Err(e) = tx.send(log) {
|
||||
warn!("send line failed: {e}");
|
||||
return;
|
||||
}
|
||||
|
||||
let remaining = buf_size - line.len();
|
||||
|
||||
if let Some(pos) = memchr::memchr(b'\n', buf) {
|
||||
let len = pos + 1;
|
||||
let mut buf = &buf[..len];
|
||||
|
||||
if len > remaining {
|
||||
line.extend_from_slice(&buf[..remaining]);
|
||||
send_line!();
|
||||
buf = &buf[remaining..];
|
||||
}
|
||||
|
||||
line.extend_from_slice(buf);
|
||||
out.consume(len);
|
||||
send_line!();
|
||||
} else if buf.len() > remaining {
|
||||
line.extend_from_slice(&buf[..remaining]);
|
||||
out.consume(remaining);
|
||||
send_line!();
|
||||
} else {
|
||||
line.extend_from_slice(buf);
|
||||
let len = buf.len();
|
||||
out.consume(len);
|
||||
}
|
||||
}
|
||||
|
||||
if !line.is_empty() {
|
||||
send_line!();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user