19 Commits

Author SHA1 Message Date
Mikaël Cluseau 6059d81b3d chore: Release dkl version 1.1.0 2026-04-14 15:58:24 +02:00
Mikaël Cluseau f3b3a9b9c7 logger: cleanup 2026-04-14 15:57:47 +02:00
Mikaël Cluseau a5026b884d dkl logger: avoid a full thread just to forward signals 2026-04-14 10:06:39 +02:00
Mikaël Cluseau c19798f9f0 dkl cg ls: more cols + customizable list 2026-04-14 09:22:52 +02:00
Mikaël Cluseau dc936f52ab logger: add cgroup option 2026-04-13 21:11:07 +02:00
Mikaël Cluseau 33fcfbd197 add cg ls, prepare for rc subcommands 2026-04-13 14:34:53 +02:00
Mikaël Cluseau 0f116e21b9 release script 2026-03-17 16:50:45 +01:00
Mikaël Cluseau aa7f15516c bump dockerfile 2026-03-16 11:20:48 +01:00
Mikaël Cluseau 4619899e65 dls: add password hash function 2026-03-16 11:06:19 +01:00
Mikaël Cluseau 4b1edb2a55 reqwest: enable socks 2026-02-25 09:45:59 +01:00
Mikaël Cluseau d449fc8dcf dkl apply-config --dry-run 2026-02-21 18:15:39 +01:00
Mikaël Cluseau ddc82199fb cargo update 2026-02-21 08:18:07 +01:00
Mikaël Cluseau 61d31bc22c dls::Config.extra_ca_certs 2026-02-21 08:17:42 +01:00
Mikaël Cluseau d2293df011 base64: be a tolerant reader 2026-02-10 21:23:11 +01:00
Mikaël Cluseau 723cecff1b fix vg name compat 2026-02-10 15:41:40 +01:00
Mikaël Cluseau e8c9ee9885 wow base64 w/ and wo/ padding are incompatible 2026-01-25 21:59:23 +01:00
Mikaël Cluseau 6a6536bdfb files: add content64 for base64 encoded values 2026-01-25 20:01:50 +01:00
Mikaël Cluseau a6dc420275 cargo update 2026-01-07 18:24:14 +01:00
Mikaël Cluseau d9fa31ec33 bootstrap: impl Default for NetworkInterface 2026-01-07 18:24:02 +01:00
17 changed files with 1378 additions and 423 deletions
Generated
+567 -339
View File
File diff suppressed because it is too large Load Diff
+9 -3
View File
@@ -1,6 +1,6 @@
[package] [package]
name = "dkl" name = "dkl"
version = "1.0.0" version = "1.1.0"
edition = "2024" edition = "2024"
[profile.release] [profile.release]
@@ -13,6 +13,7 @@ codegen-units = 1
[dependencies] [dependencies]
async-compression = { version = "0.4.27", features = ["tokio", "zstd"] } async-compression = { version = "0.4.27", features = ["tokio", "zstd"] }
base32 = "0.5.1" base32 = "0.5.1"
base64 = "0.22.1"
bytes = "1.10.1" bytes = "1.10.1"
chrono = { version = "0.4.41", default-features = false, features = ["clock", "now"] } chrono = { version = "0.4.41", default-features = false, features = ["clock", "now"] }
clap = { version = "4.5.40", features = ["derive", "env"] } clap = { version = "4.5.40", features = ["derive", "env"] }
@@ -27,13 +28,18 @@ hex = "0.4.3"
human-units = "0.5.3" human-units = "0.5.3"
log = "0.4.27" log = "0.4.27"
lz4 = "1.28.1" lz4 = "1.28.1"
nix = { version = "0.30.1", features = ["user"] } memchr = "2.8.0"
nix = { version = "0.31.2", features = ["process", "signal", "user"] }
openssl = "0.10.73" openssl = "0.10.73"
page_size = "0.6.0" page_size = "0.6.0"
reqwest = { version = "0.12.20", features = ["json", "stream", "native-tls"] } reqwest = { version = "0.13.1", features = ["json", "stream", "native-tls", "socks"], default-features = false }
rpassword = "7.4.0"
rust-argon2 = "3.0.0"
serde = { version = "1.0.219", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140" serde_json = "1.0.140"
serde_yaml = "0.9.34" serde_yaml = "0.9.34"
signal-hook = "0.4.4"
tabled = "0.20.0"
thiserror = "2.0.12" thiserror = "2.0.12"
tokio = { version = "1.45.1", features = ["fs", "io-std", "macros", "process", "rt"] } tokio = { version = "1.45.1", features = ["fs", "io-std", "macros", "process", "rt"] }
+2 -2
View File
@@ -1,4 +1,4 @@
from mcluseau/rust:1.88.0 as build from mcluseau/rust:1.94.0 as build
workdir /app workdir /app
copy . . copy . .
@@ -10,6 +10,6 @@ run \
&& find target/release -maxdepth 1 -type f -executable -exec cp -v {} /dist/ + && find target/release -maxdepth 1 -type f -executable -exec cp -v {} /dist/ +
# ------------------------------------------------------------------------ # ------------------------------------------------------------------------
from alpine:3.22 from alpine:3.23
copy --from=build /dist/ /bin/ copy --from=build /dist/ /bin/
Executable
+17
View File
@@ -0,0 +1,17 @@
set -ex
tag=$(git describe --always)
repo=novit.tech/direktil/dkl:$tag
docker build --push --platform=linux/amd64,linux/arm64 . -t $repo
publish() {
arch=$1
pf=$2
curl --user $(jq '.auths["novit.tech"].auth' ~/.docker/config.json -r |base64 -d) \
--upload-file <(docker run --rm --platform $pf $repo cat /bin/dkl) \
https://novit.tech/api/packages/direktil/generic/dkl/$tag/dkl.$arch
}
publish x86_64 linux/amd64
publish arm64 linux/arm64
+45 -5
View File
@@ -3,21 +3,61 @@ use log::info;
use std::path::Path; use std::path::Path;
use tokio::fs; use tokio::fs;
pub async fn files(files: &[crate::File], root: &str) -> Result<()> { use crate::base64_decode;
pub async fn files(files: &[crate::File], root: &str, dry_run: bool) -> Result<()> {
for file in files { for file in files {
let path = chroot(root, &file.path); let path = chroot(root, &file.path);
let path = Path::new(&path); let path = Path::new(&path);
if let Some(parent) = path.parent() { if !dry_run && let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?; fs::create_dir_all(parent).await?;
} }
use crate::FileKind as K; use crate::FileKind as K;
match &file.kind { match &file.kind {
K::Content(content) => fs::write(path, content.as_bytes()).await?, K::Content(content) => {
K::Dir(true) => fs::create_dir(path).await?, if dry_run {
info!(
"would create {} ({} bytes from content)",
file.path,
content.len()
);
} else {
fs::write(path, content.as_bytes()).await?;
}
}
K::Content64(content) => {
let content = base64_decode(content)?;
if dry_run {
info!(
"would create {} ({} bytes from content64)",
file.path,
content.len()
);
} else {
fs::write(path, content).await?
}
}
K::Dir(true) => {
if dry_run {
info!("would create {} (directory)", file.path);
} else {
fs::create_dir(path).await?;
}
}
K::Dir(false) => {} // shouldn't happen, but semantic is to ignore K::Dir(false) => {} // shouldn't happen, but semantic is to ignore
K::Symlink(tgt) => fs::symlink(tgt, path).await?, K::Symlink(tgt) => {
if dry_run {
info!("would create {} (symlink to {})", file.path, tgt);
} else {
fs::symlink(tgt, path).await?;
}
}
}
if dry_run {
continue;
} }
match file.kind { match file.kind {
+43 -4
View File
@@ -3,6 +3,7 @@ use eyre::{format_err, Result};
use human_units::Duration; use human_units::Duration;
use log::{debug, error}; use log::{debug, error};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf;
use tokio::fs; use tokio::fs;
#[derive(Parser)] #[derive(Parser)]
@@ -24,6 +25,9 @@ enum Command {
/// path prefix (aka chroot) /// path prefix (aka chroot)
#[arg(short = 'P', long, default_value = "/")] #[arg(short = 'P', long, default_value = "/")]
prefix: String, prefix: String,
/// don't really write files
#[arg(long)]
dry_run: bool,
}, },
Logger { Logger {
/// Path where the logs are stored /// Path where the logs are stored
@@ -35,6 +39,9 @@ enum Command {
/// prefix log lines with time & stream /// prefix log lines with time & stream
#[arg(long)] #[arg(long)]
with_prefix: bool, with_prefix: bool,
/// exec command in this cgroup
#[arg(long)]
cgroup: Option<String>,
command: String, command: String,
args: Vec<String>, args: Vec<String>,
}, },
@@ -78,6 +85,23 @@ enum Command {
#[arg(long, default_value = "5s")] #[arg(long, default_value = "5s")]
timeout: Duration, timeout: Duration,
}, },
Cg {
#[command(subcommand)]
cmd: CgCmd,
},
}
#[derive(Subcommand)]
enum CgCmd {
Ls {
#[arg(long)]
root: Option<PathBuf>,
#[arg(long, short = 'X')]
exclude: Vec<String>,
#[arg(long, short = 'C')]
cols: Option<String>,
},
} }
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
@@ -97,14 +121,16 @@ async fn main() -> Result<()> {
config, config,
filters, filters,
prefix, prefix,
dry_run,
} => { } => {
let filters = parse_globs(&filters)?; let filters = parse_globs(&filters)?;
apply_config(&config, &filters, &prefix).await apply_config(&config, &filters, &prefix, dry_run).await
} }
C::Logger { C::Logger {
ref log_path, ref log_path,
ref log_name, ref log_name,
with_prefix, with_prefix,
cgroup,
command, command,
args, args,
} => { } => {
@@ -116,7 +142,7 @@ async fn main() -> Result<()> {
log_name, log_name,
with_prefix, with_prefix,
} }
.run(command, &args) .run(cgroup, command, &args)
.await .await
} }
C::Log { C::Log {
@@ -153,10 +179,23 @@ async fn main() -> Result<()> {
.run() .run()
.await .await
.map(|_| ())?), .map(|_| ())?),
C::Cg { cmd } => match cmd {
CgCmd::Ls {
root,
exclude,
cols,
} => Ok(dkl::cgroup::ls(root, &exclude, cols.as_deref()).await?),
},
} }
} }
async fn apply_config(config_file: &str, filters: &[glob::Pattern], chroot: &str) -> Result<()> { async fn apply_config(
config_file: &str,
filters: &[glob::Pattern],
chroot: &str,
dry_run: bool,
) -> Result<()> {
let config = fs::read_to_string(config_file).await?; let config = fs::read_to_string(config_file).await?;
let config: dkl::Config = serde_yaml::from_str(&config)?; let config: dkl::Config = serde_yaml::from_str(&config)?;
@@ -168,7 +207,7 @@ async fn apply_config(config_file: &str, filters: &[glob::Pattern], chroot: &str
.collect() .collect()
}; };
dkl::apply::files(&files, chroot).await dkl::apply::files(&files, chroot, dry_run).await
} }
#[derive(Subcommand)] #[derive(Subcommand)]
+21 -6
View File
@@ -36,6 +36,10 @@ enum Command {
}, },
#[command(subcommand)] #[command(subcommand)]
DlSet(DlSet), DlSet(DlSet),
/// hash a password
Hash {
salt: String,
},
} }
#[derive(Subcommand)] #[derive(Subcommand)]
@@ -103,14 +107,16 @@ async fn main() -> eyre::Result<()> {
.parse_default_env() .parse_default_env()
.init(); .init();
let token = std::env::var("DLS_TOKEN").map_err(|_| format_err!("DLS_TOKEN should be set"))?; let dls = || {
let token = std::env::var("DLS_TOKEN").expect("DLS_TOKEN should be set");
let dls = dls::Client::new(cli.dls, token); dls::Client::new(cli.dls, token)
};
use Command as C; use Command as C;
match cli.command { match cli.command {
C::Clusters => write_json(&dls.clusters().await?), C::Clusters => write_json(&dls().clusters().await?),
C::Cluster { cluster, command } => { C::Cluster { cluster, command } => {
let dls = dls();
let cluster = dls.cluster(cluster); let cluster = dls.cluster(cluster);
use ClusterCommand as CC; use ClusterCommand as CC;
@@ -155,8 +161,9 @@ async fn main() -> eyre::Result<()> {
} }
} }
} }
C::Hosts => write_json(&dls.hosts().await?), C::Hosts => write_json(&dls().hosts().await?),
C::Host { out, host, asset } => { C::Host { out, host, asset } => {
let dls = dls();
let host_name = host.clone(); let host_name = host.clone();
let host = dls.host(host); let host = dls.host(host);
match asset { match asset {
@@ -171,7 +178,7 @@ async fn main() -> eyre::Result<()> {
C::DlSet(set) => match set { C::DlSet(set) => match set {
DlSet::Sign { expiry, items } => { DlSet::Sign { expiry, items } => {
let req = dls::DownloadSetReq { expiry, items }; let req = dls::DownloadSetReq { expiry, items };
let signed = dls.sign_dl_set(&req).await?; let signed = dls().sign_dl_set(&req).await?;
println!("{signed}"); println!("{signed}");
} }
DlSet::Show { signed_set } => { DlSet::Show { signed_set } => {
@@ -211,11 +218,19 @@ async fn main() -> eyre::Result<()> {
name, name,
asset, asset,
} => { } => {
let dls = dls();
let stream = dls.fetch_dl_set(&signed_set, &kind, &name, &asset).await?; let stream = dls.fetch_dl_set(&signed_set, &kind, &name, &asset).await?;
let mut out = create_asset_file(out, &kind, &name, &asset).await?; let mut out = create_asset_file(out, &kind, &name, &asset).await?;
copy_stream(stream, &mut out).await?; copy_stream(stream, &mut out).await?;
} }
}, },
C::Hash { salt } => {
let salt = dkl::base64_decode(&salt)?;
let passphrase = rpassword::prompt_password("password to hash: ")?;
let hash = dls::store::hash_password(&salt, &passphrase)?;
println!("hash (hex): {}", hex::encode(&hash));
println!("hash (base64): {}", dkl::base64_encode(&hash));
}
}; };
Ok(()) Ok(())
+12 -1
View File
@@ -78,6 +78,17 @@ pub struct NetworkInterface {
pub udev: Option<UdevFilter>, pub udev: Option<UdevFilter>,
} }
impl Default for NetworkInterface {
fn default() -> Self {
Self {
var: "iface".into(),
n: 1,
regexps: Vec::new(),
udev: Some(UdevFilter::Eq("INTERFACE".into(), "eth0".into())),
}
}
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct SSHServer { pub struct SSHServer {
pub listen: String, pub listen: String,
@@ -94,7 +105,7 @@ impl Default for SSHServer {
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct LvmVG { pub struct LvmVG {
#[serde(alias = "vg")] #[serde(rename = "vg", alias = "name")]
pub name: String, pub name: String,
pub pvs: LvmPV, pub pvs: LvmPV,
+381
View File
@@ -0,0 +1,381 @@
use std::borrow::Cow;
use std::fmt::Display;
use std::path::{Path as StdPath, PathBuf};
use std::rc::Rc;
use std::str::FromStr;
use crate::{fs, human::Human};
pub const ROOT: &str = "/sys/fs/cgroup";
pub async fn ls(
parent: Option<impl AsRef<StdPath>>,
exclude: &[String],
columns: Option<&str>,
) -> fs::Result<()> {
let mut root = PathBuf::from(ROOT);
if let Some(parent) = parent {
root = root.join(parent);
}
let cols: [(&str, fn(&Cgroup) -> String); _] = [
("wset", |cg| cg.memory.working_set().human()),
("anon", |cg| cg.memory.stat.anon.human()),
("min", |cg| cg.memory.min.human()),
("low", |cg| cg.memory.low.human()),
("high", |cg| cg.memory.high.human()),
("max", |cg| cg.memory.max.human()),
];
let cols = if let Some(columns) = columns {
(cols.into_iter())
.filter(|(n, _)| columns.split(',').any(|col| &col == n))
.collect()
} else {
cols.to_vec()
};
let mut table = tabled::builder::Builder::new();
table.push_record(["cgroup"].into_iter().chain(cols.iter().map(|(n, _)| *n)));
let mut todo = vec![(Cgroup::root(root).await?, vec![], true)];
while let Some((cg, p_lasts, last)) = todo.pop() {
let mut name = String::new();
for last in p_lasts.iter().skip(1) {
name.push_str(if *last { " " } else { "| " });
}
if !p_lasts.is_empty() {
name.push_str(if last { "`- " } else { "|- " });
}
name.push_str(&cg.name());
table.push_record([name].into_iter().chain(cols.iter().map(|(_, f)| f(&cg))));
let mut p_lasts = p_lasts.clone();
p_lasts.push(last);
let mut children = cg.read_children().await?;
children.sort();
todo.extend(
(children.into_iter().rev())
.filter(|c| !exclude.iter().any(|x| x == &c.path.full_name()))
.enumerate()
.map(|(i, child)| (child, p_lasts.clone(), i == 0)),
);
}
use tabled::settings::{
object::{Column, Row},
Alignment, Modify,
};
let mut table = table.build();
table.with(tabled::settings::Style::psql());
table.with(Alignment::right());
table.with(Modify::list(Column::from(0), Alignment::left()));
table.with(Modify::list(Row::from(0), Alignment::left()));
println!("{}", table);
Ok(())
}
pub struct Cgroup {
path: Rc<Path>,
children: Vec<PathBuf>,
memory: Memory,
}
impl Cgroup {
pub async fn root(path: impl AsRef<StdPath>) -> fs::Result<Self> {
let path = path.as_ref();
Self::read(Path::root(path), path).await
}
async fn read(cg_path: Rc<Path>, path: impl AsRef<StdPath>) -> fs::Result<Self> {
let path = path.as_ref();
use fs::Error as E;
let mut rd = fs::read_dir(path).await?;
let mut cg = Self {
path: cg_path,
children: Vec::new(),
memory: Memory::default(),
};
while let Some(entry) = (rd.next_entry().await).map_err(|e| E::ReadDir(path.into(), e))? {
let path = entry.path();
let Some(file_name) = path.file_name() else {
continue;
};
if (entry.file_type().await)
.map_err(|e| E::Stat(path.clone(), e))?
.is_dir()
{
cg.children.push(file_name.into());
continue;
}
let file_name = file_name.as_encoded_bytes();
let Some(idx) = file_name.iter().position(|b| *b == b'.') else {
continue;
};
let (controller, param) = file_name.split_at(idx);
let param = &param[1..];
match controller {
b"memory" => match param {
b"current" => cg.memory.current = read_parse(path).await?,
b"low" => cg.memory.low = read_parse(path).await?,
b"high" => cg.memory.high = read_parse(path).await?,
b"min" => cg.memory.min = read_parse(path).await?,
b"max" => cg.memory.max = read_parse(path).await?,
b"stat" => cg.memory.stat.read_from(path).await?,
_ => {}
},
_ => {}
}
}
Ok(cg)
}
async fn read_children(&self) -> fs::Result<Vec<Self>> {
let mut r = Vec::with_capacity(self.children.len());
let mut dir = PathBuf::from(self.path.as_ref());
for child_name in &self.children {
dir.push(child_name);
let child_path = Path::Child(self.path.clone(), child_name.into());
r.push(Self::read(child_path.into(), &dir).await?);
dir.pop();
}
Ok(r)
}
pub fn name(&self) -> Cow<'_, str> {
self.path.name()
}
pub fn full_name(&self) -> String {
self.path.full_name()
}
pub fn children(&self) -> impl Iterator<Item = &StdPath> {
self.children.iter().map(|n| n.as_path())
}
pub async fn read_child(&self, name: impl AsRef<StdPath>) -> fs::Result<Self> {
let name = name.as_ref();
let mut dir = PathBuf::from(self.path.as_ref());
dir.push(name);
Self::read(self.path.child(name), &dir).await
}
pub async fn write_param(
&self,
name: impl AsRef<StdPath>,
value: impl AsRef<[u8]>,
) -> fs::Result<()> {
let cg_dir = PathBuf::from(self.path.as_ref());
fs::write(cg_dir.join(name), value).await
}
}
impl PartialEq for Cgroup {
fn eq(&self, o: &Self) -> bool {
&self.path == &o.path
}
}
impl Eq for Cgroup {}
impl Ord for Cgroup {
fn cmp(&self, o: &Self) -> std::cmp::Ordering {
self.path.cmp(&o.path)
}
}
impl PartialOrd for Cgroup {
fn partial_cmp(&self, o: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(o))
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
enum Path {
Root(PathBuf),
Child(Rc<Path>, PathBuf),
}
impl Path {
fn name(&self) -> Cow<'_, str> {
match self {
Self::Root(_) => "/".into(),
Self::Child(_, n) => n.to_string_lossy(),
}
}
fn full_name(&self) -> String {
use Path::*;
match self {
Root(_) => "/".into(),
Child(parent, _) => match parent.as_ref() {
Root(_) => self.name().into(),
Child(_, _) => format!("{}/{}", parent.full_name(), self.name()),
},
}
}
fn depth(&self) -> usize {
use Path::*;
match self {
Root(_) => 0,
Child(p, _) => 1 + p.depth(),
}
}
fn root(dir: impl Into<PathBuf>) -> Rc<Self> {
Rc::new(Self::Root(dir.into()))
}
fn child(self: &Rc<Self>, name: impl Into<PathBuf>) -> Rc<Self> {
Rc::new(Self::Child(self.clone(), name.into()))
}
}
impl From<&Path> for PathBuf {
fn from(mut p: &Path) -> Self {
let mut stack = Vec::with_capacity(p.depth() + 1);
loop {
match p {
Path::Root(root_path) => {
stack.push(root_path);
break;
}
Path::Child(parent, n) => {
stack.push(n);
p = parent;
}
}
}
let len = stack.iter().map(|p| p.as_os_str().len() + 1).sum::<usize>() - 1;
let mut buf = PathBuf::with_capacity(len);
buf.extend(stack.into_iter().rev());
buf
}
}
#[test]
fn test_path_to_pathbuf() {
let root = Path::root("/a/b");
let c1 = root.child("c1");
let c1_1 = c1.child("c1-1");
assert_eq!(PathBuf::from("/a/b/c1"), PathBuf::from(c1.as_ref()));
assert_eq!(PathBuf::from("/a/b/c1/c1-1"), PathBuf::from(c1_1.as_ref()));
}
#[derive(Default)]
struct Memory {
current: Option<u64>,
low: Option<u64>,
high: Option<Max>,
min: Option<u64>,
max: Option<Max>,
stat: MemoryStat,
}
impl Memory {
/// working set as defined by cAdvisor
/// (https://github.com/google/cadvisor/blob/e1ccfa9b4cf2e17d74e0f5526b6487b74b704503/container/libcontainer/handler.go#L853-L862)
fn working_set(&self) -> Option<u64> {
let cur = self.current?;
let inactive = self.stat.inactive_file?;
(inactive <= cur).then(|| cur - inactive)
}
}
#[derive(Default)]
struct MemoryStat {
anon: Option<u64>,
file: Option<u64>,
kernel: Option<u64>,
kernel_stack: Option<u64>,
pagetables: Option<u64>,
shmem: Option<u64>,
inactive_file: Option<u64>,
}
enum Max {
Num(u64),
Max,
}
impl Display for Max {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Num(n) => write!(f, "{n}"),
Self::Max => f.write_str("max"),
}
}
}
impl Human for Max {
fn human(&self) -> String {
match self {
Self::Num(n) => n.human(),
Self::Max => "+∞".into(),
}
}
}
impl FromStr for Max {
type Err = std::num::ParseIntError;
fn from_str(s: &str) -> std::result::Result<Self, <Self as FromStr>::Err> {
Ok(match s {
"max" => Self::Max,
s => Self::Num(s.parse()?),
})
}
}
async fn read_parse<T: FromStr>(path: impl AsRef<StdPath>) -> fs::Result<Option<T>>
where
T::Err: Display,
{
let path = path.as_ref();
(fs::read_to_string(path).await?)
.trim_ascii()
.parse()
.map_err(|e| fs::Error::Other(path.into(), format!("parse failed: {e}")))
.map(|v| Some(v))
}
impl MemoryStat {
async fn read_from(&mut self, path: impl AsRef<StdPath>) -> fs::Result<()> {
for line in (fs::read_to_string(path).await?).lines() {
let Some((key, value)) = line.split_once(' ') else {
continue;
};
let value = value.parse::<u64>().ok();
match key {
"anon" => self.anon = value,
"file" => self.file = value,
"kernel" => self.kernel = value,
"kernel_stack" => self.kernel_stack = value,
"pagetables" => self.pagetables = value,
"shmem" => self.shmem = value,
"inactive_file" => self.inactive_file = value,
_ => {}
}
}
Ok(())
}
}
+4
View File
@@ -6,6 +6,8 @@ use std::collections::BTreeMap as Map;
use std::fmt::Display; use std::fmt::Display;
use std::net::IpAddr; use std::net::IpAddr;
pub mod store;
pub struct Client { pub struct Client {
base_url: String, base_url: String,
token: String, token: String,
@@ -170,6 +172,8 @@ pub struct Config {
pub host_templates: Vec<HostConfig>, pub host_templates: Vec<HostConfig>,
#[serde(default, rename = "SSLConfig")] #[serde(default, rename = "SSLConfig")]
pub ssl_config: String, pub ssl_config: String,
#[serde(default, deserialize_with = "deserialize_null_as_default")]
pub extra_ca_certs: Map<String, String>,
} }
// compensate for go's encoder pitfalls // compensate for go's encoder pitfalls
+17
View File
@@ -0,0 +1,17 @@
pub fn hash_password(salt: &[u8], passphrase: &str) -> argon2::Result<[u8; 32]> {
let hash = argon2::hash_raw(
passphrase.as_bytes(),
salt,
&argon2::Config {
variant: argon2::Variant::Argon2id,
hash_length: 32,
time_cost: 1,
mem_cost: 65536,
thread_mode: argon2::ThreadMode::Parallel,
lanes: 4,
..Default::default()
},
)?;
unsafe { Ok(hash.try_into().unwrap_unchecked()) }
}
+1 -1
View File
@@ -1,4 +1,4 @@
use eyre::{format_err, Result}; use eyre::{Result, format_err};
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use std::path::PathBuf; use std::path::PathBuf;
use tokio::{fs, io::AsyncWriteExt, process::Command}; use tokio::{fs, io::AsyncWriteExt, process::Command};
+48 -4
View File
@@ -1,9 +1,53 @@
use eyre::Result;
use std::fs::Metadata; use std::fs::Metadata;
use std::path::PathBuf; use std::path::{Path, PathBuf};
use tokio::fs::read_dir; use tokio::fs;
use tokio::sync::mpsc; use tokio::sync::mpsc;
pub type Result<T> = std::result::Result<T, Error>;
pub use tokio::fs::ReadDir;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("{0}: read dir: {1}")]
ReadDir(PathBuf, std::io::Error),
#[error("{0}: exists: {1}")]
Exists(PathBuf, std::io::Error),
#[error("{0}: read: {1}")]
Read(PathBuf, std::io::Error),
#[error("{0}: stat: {1}")]
Stat(PathBuf, std::io::Error),
#[error("{0}: create dir: {1}")]
CreateDir(PathBuf, std::io::Error),
#[error("{0}: write: {1}")]
Write(PathBuf, std::io::Error),
#[error("{0}: remove file: {1}")]
RemoveFile(PathBuf, std::io::Error),
#[error("{0}: symlink: {1}")]
Symlink(PathBuf, std::io::Error),
#[error("{0}: {1}")]
Other(PathBuf, String),
}
macro_rules! wrap_path {
($fn:ident $( ( $( $pname:ident : $ptype:ty ),* ) )? -> $result:ty, $err:ident) => {
pub async fn $fn(path: impl AsRef<Path>$($(, $pname: $ptype)*)?) -> Result<$result> {
let path = path.as_ref();
fs::$fn(path $($(, $pname)*)?).await.map_err(|e| Error::$err(path.into(), e))
}
};
}
wrap_path!(read_dir -> ReadDir, ReadDir);
wrap_path!(try_exists -> bool, Exists);
wrap_path!(read -> Vec<u8>, Read);
wrap_path!(read_to_string -> String, Read);
wrap_path!(create_dir -> (), CreateDir);
wrap_path!(create_dir_all -> (), CreateDir);
wrap_path!(remove_file -> (), RemoveFile);
wrap_path!(symlink(link_src: impl AsRef<Path>) -> (), Symlink);
wrap_path!(write(content: impl AsRef<[u8]>) -> (), Write);
pub fn spawn_walk_dir( pub fn spawn_walk_dir(
dir: impl Into<PathBuf> + Send + 'static, dir: impl Into<PathBuf> + Send + 'static,
) -> mpsc::Receiver<Result<(PathBuf, Metadata)>> { ) -> mpsc::Receiver<Result<(PathBuf, Metadata)>> {
@@ -24,7 +68,7 @@ pub async fn walk_dir(dir: impl Into<PathBuf>, tx: mpsc::Sender<Result<(PathBuf,
let entry = match rd.next_entry().await { let entry = match rd.next_entry().await {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
if tx.send(Err(e.into())).await.is_err() { if tx.send(Err(Error::ReadDir(dir.clone(), e))).await.is_err() {
return; return;
} }
todo.pop_front(); // skip dir on error todo.pop_front(); // skip dir on error
+35
View File
@@ -0,0 +1,35 @@
use human_units::FormatSize;
use std::fmt::{Display, Formatter, Result};
pub trait Human {
fn human(&self) -> String;
}
pub struct Quantity(u64);
impl Display for Quantity {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
self.0.format_size().fmt(f)
}
}
impl Human for Quantity {
fn human(&self) -> String {
self.to_string()
}
}
impl Human for u64 {
fn human(&self) -> String {
self.format_size().to_string()
}
}
impl<T: Human> Human for Option<T> {
fn human(&self) -> String {
match self {
Some(h) => h.human(),
None => "".to_string(),
}
}
}
+16 -2
View File
@@ -1,10 +1,13 @@
pub mod apply; pub mod apply;
pub mod proxy;
pub mod bootstrap; pub mod bootstrap;
pub mod cgroup;
pub mod dls; pub mod dls;
pub mod dynlay; pub mod dynlay;
pub mod fs; pub mod fs;
pub mod human;
pub mod logger; pub mod logger;
pub mod proxy;
pub mod rc;
#[derive(Debug, Default, serde::Deserialize, serde::Serialize)] #[derive(Debug, Default, serde::Deserialize, serde::Serialize)]
pub struct Config { pub struct Config {
@@ -63,9 +66,10 @@ pub struct File {
} }
#[derive(Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)] #[derive(Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "lowercase")]
pub enum FileKind { pub enum FileKind {
Content(String), Content(String),
Content64(String),
Symlink(String), Symlink(String),
Dir(bool), Dir(bool),
} }
@@ -80,3 +84,13 @@ impl Config {
self.files.iter().find(|f| f.path == path) self.files.iter().find(|f| f.path == path)
} }
} }
pub fn base64_decode(s: &str) -> Result<Vec<u8>, base64::DecodeError> {
use base64::{Engine as _, prelude::BASE64_STANDARD_NO_PAD as B64};
B64.decode(s.trim_end_matches('='))
}
pub fn base64_encode(b: &[u8]) -> String {
use base64::{Engine as _, prelude::BASE64_STANDARD as B64};
B64.encode(b)
}
+159 -56
View File
@@ -5,13 +5,15 @@ use log::{debug, error, warn};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::Stdio; use std::process::Stdio;
use tokio::{ use tokio::{
fs::{self, File}, fs::File,
io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}, io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter},
process, process,
sync::mpsc, sync::mpsc,
time::{sleep, Duration}, time::{sleep, Duration},
}; };
use crate::{cgroup, fs};
pub type Timestamp = chrono::DateTime<Utc>; pub type Timestamp = chrono::DateTime<Utc>;
const TS_FORMAT: &str = "%Y%m%d_%H"; const TS_FORMAT: &str = "%Y%m%d_%H";
@@ -27,7 +29,7 @@ pub struct Logger<'t> {
} }
impl<'t> Logger<'t> { impl<'t> Logger<'t> {
pub async fn run(&self, command: &str, args: &[String]) -> Result<()> { pub async fn run(&self, cgroup: Option<String>, command: &str, args: &[String]) -> Result<()> {
// make sure we can at least open the log before starting the command // make sure we can at least open the log before starting the command
let archives_path = &format!("{path}/archives", path = self.log_path); let archives_path = &format!("{path}/archives", path = self.log_path);
(fs::create_dir_all(archives_path).await) (fs::create_dir_all(archives_path).await)
@@ -45,20 +47,42 @@ impl<'t> Logger<'t> {
)); ));
// start the command // start the command
let mut child = process::Command::new(command) let mut cmd = process::Command::new(command);
.args(args) cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped());
.stdout(Stdio::piped()) if let Some(cgroup) = cgroup.as_deref() {
.stderr(Stdio::piped()) let mut cg_path = PathBuf::from(cgroup::ROOT);
.spawn()?; cg_path.push(cgroup);
cg_path.push(self.log_name);
let (tx, mut rx) = mpsc::unbounded_channel(); use std::io::ErrorKind as K;
match tokio::fs::create_dir(&cg_path).await {
Ok(_) => debug!("created dir {}", cg_path.display()),
Err(e) if e.kind() == K::AlreadyExists => {
debug!("existing dir {}", cg_path.display())
}
Err(e) => return Err(fs::Error::CreateDir(cg_path, e).into()),
}
let procs_file = cg_path.join("cgroup.procs");
debug!("procs file {}", procs_file.display());
fs::write(&procs_file, b"0").await?;
}
let mut child = cmd.spawn().map_err(|e| format_err!("exec failed: {e}"))?;
let (tx, mut rx) = mpsc::channel(8);
tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone())); tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone()));
tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx)); tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx));
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL); // forward signals
if let Some(child_pid) = child.id() {
forward_signals_to(child_pid as i32);
}
// handle output // handle output
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
loop { loop {
tokio::select!( tokio::select!(
r = rx.recv() => { r = rx.recv() => {
@@ -78,13 +102,14 @@ impl<'t> Logger<'t> {
); );
} }
let status = child.wait().await?;
// finalize // finalize
while let Err(e) = current_log.flush().await { while let Err(e) = current_log.flush().await {
error!("final log flush failed: {e}"); error!("final log flush failed: {e}");
sleep(WRITE_RETRY_DELAY).await; sleep(WRITE_RETRY_DELAY).await;
} }
let status = child.wait().await?;
std::process::exit(status.code().unwrap_or(-1)); std::process::exit(status.code().unwrap_or(-1));
} }
@@ -112,6 +137,9 @@ impl<'t> Logger<'t> {
} }
out.write_all(&log.line).await?; out.write_all(&log.line).await?;
if log.line.last() != Some(&b'\n') {
out.write("\n".as_bytes()).await?;
}
Ok(()) Ok(())
} }
@@ -125,29 +153,28 @@ impl<'t> Logger<'t> {
.open(log_file) .open(log_file)
.await?; .await?;
let link_src = &format!( let link_src = &PathBuf::from(self.log_path)
"{path}/{name}.log", .join(self.log_name)
path = self.log_path, .with_added_extension("log");
name = self.log_name let link_tgt = &self.archive_rel_path(ts);
);
let link_tgt = log_file
.strip_prefix(&format!("{}/", self.log_path))
.unwrap_or(log_file);
let _ = fs::remove_file(link_src).await; let _ = fs::remove_file(link_src).await;
fs::symlink(link_tgt, link_src) fs::symlink(link_tgt, link_src).await.map_err(|e| {
.await format_err!(
.map_err(|e| format_err!("symlink {link_src} -> {link_tgt} failed: {e}",))?; "symlink {s} -> {t} failed: {e}",
s = link_src.display(),
t = link_tgt.display()
)
})?;
Ok(file) Ok(file)
} }
fn archive_path(&self, ts: Timestamp) -> String { fn archive_path(&self, ts: Timestamp) -> PathBuf {
format!( PathBuf::from(self.log_path).join(self.archive_rel_path(ts))
"{path}/archives/{file}", }
path = self.log_path, fn archive_rel_path(&self, ts: Timestamp) -> PathBuf {
file = self.archive_file(ts) PathBuf::from("archives").join(self.archive_file(ts))
)
} }
fn archive_file(&self, ts: Timestamp) -> String { fn archive_file(&self, ts: Timestamp) -> String {
format!( format!(
@@ -158,39 +185,101 @@ impl<'t> Logger<'t> {
} }
} }
fn forward_signals_to(pid: i32) {
use nix::{
sys::signal::{kill, Signal},
unistd::Pid,
};
use signal_hook::{consts::*, low_level::register};
debug!("forwarding signals to pid {pid}");
let pid = Pid::from_raw(pid);
let signals = [
SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGUSR1, SIGUSR2, SIGPIPE, SIGALRM,
];
for sig in signals {
let Ok(signal) = Signal::try_from(sig) else {
continue;
};
unsafe {
register(sig, move || {
debug!("forwarding {signal} to {pid}");
let _ = kill(pid, signal);
})
.ok();
}
}
}
struct LogItem { struct LogItem {
stream_name: &'static str, stream_name: &'static str,
ts: chrono::DateTime<chrono::Utc>, ts: chrono::DateTime<chrono::Utc>,
line: Vec<u8>, line: Vec<u8>,
} }
async fn copy( async fn copy(stream_name: &'static str, out: impl AsyncRead + Unpin, tx: mpsc::Sender<LogItem>) {
stream_name: &'static str,
out: impl AsyncRead + Unpin,
tx: mpsc::UnboundedSender<LogItem>,
) {
let mut out = BufReader::new(out);
let buf_size = page_size::get(); let buf_size = page_size::get();
let mut out = BufReader::with_capacity(buf_size, out);
let mut line = Vec::with_capacity(buf_size);
macro_rules! send_line {
() => {
let log = LogItem {
stream_name,
ts: chrono::Utc::now(),
line: line.clone(),
};
if let Err(e) = tx.send(log).await {
warn!("send line failed: {e}");
return;
}
line.clear();
};
}
loop { loop {
let mut line = Vec::with_capacity(buf_size); let Ok(buf) = (out.fill_buf())
if let Err(e) = out.read_until(b'\n', &mut line).await { .await
warn!("read {stream_name} failed: {e}"); .inspect_err(|e| warn!("read {stream_name} failed: {e}"))
return; else {
} break;
if line.is_empty() { };
if buf.is_empty() {
break; break;
} }
let log = LogItem { let remaining = buf_size - line.len();
stream_name,
ts: chrono::Utc::now(), if let Some(pos) = memchr::memchr(b'\n', buf) {
line, let len = pos + 1;
}; let mut buf = &buf[..len];
if let Err(e) = tx.send(log) {
warn!("send line failed: {e}"); if len > remaining {
return; line.extend_from_slice(&buf[..remaining]);
send_line!();
buf = &buf[remaining..];
}
line.extend_from_slice(buf);
out.consume(len);
send_line!();
} else if buf.len() > remaining {
line.extend_from_slice(&buf[..remaining]);
out.consume(remaining);
send_line!();
} else {
line.extend_from_slice(buf);
let len = buf.len();
out.consume(len);
} }
} }
if !line.is_empty() {
send_line!();
}
} }
pub fn trunc_ts(ts: Timestamp) -> Timestamp { pub fn trunc_ts(ts: Timestamp) -> Timestamp {
@@ -250,14 +339,14 @@ async fn compress(path: impl AsRef<Path>) {
let mut out = ZstdEncoder::new(out); let mut out = ZstdEncoder::new(out);
async { async {
tokio::io::copy(&mut input, &mut out).await?; tokio::io::copy(&mut input, &mut out).await?;
out.flush().await out.shutdown().await
} }
.await .await
.map_err(|e| format_err!("compression of {path_str} failed: {e}"))?; .map_err(|e| format_err!("compression of {path_str} failed: {e}"))?;
fs::remove_file(path) fs::remove_file(path)
.await .await
.map_err(|e| format_err!("remove {path_str} failed: {e}")) .map_err(|e| format_err!("remove {path_str} failed: {e}"))
} }
.await; .await;
@@ -267,19 +356,22 @@ async fn compress(path: impl AsRef<Path>) {
} }
pub fn parse_ts(ts: &str) -> std::result::Result<Timestamp, chrono::ParseError> { pub fn parse_ts(ts: &str) -> std::result::Result<Timestamp, chrono::ParseError> {
let dt = let format = &format!("{TS_FORMAT}%M%S");
chrono::NaiveDateTime::parse_from_str(&format!("{ts}0000"), &format!("{TS_FORMAT}%M%S"))?; let full_ts = &format!("{ts}0000");
let dt = chrono::NaiveDateTime::parse_from_str(full_ts, format)?;
Ok(Timestamp::from_naive_utc_and_offset(dt, Utc)) Ok(Timestamp::from_naive_utc_and_offset(dt, Utc))
} }
pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result<Vec<LogFile>> { pub async fn log_files(log_path: &str, log_name: &str) -> fs::Result<Vec<LogFile>> {
let mut dir = PathBuf::from(log_path); let mut dir = PathBuf::from(log_path);
dir.push("archives"); dir.push("archives");
let mut entries = Vec::new(); let mut entries = Vec::new();
let mut read_dir = fs::read_dir(dir).await?; let mut read_dir = fs::read_dir(&dir).await?;
while let Some(entry) = read_dir.next_entry().await? { while let Some(entry) =
(read_dir.next_entry().await).map_err(|e| fs::Error::ReadDir(dir.clone(), e))?
{
let file_name = entry.file_name(); let file_name = entry.file_name();
let Some(file_name) = file_name.to_str() else { let Some(file_name) = file_name.to_str() else {
continue; continue;
@@ -318,16 +410,27 @@ pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result<Vec<Lo
Ok(entries) Ok(entries)
} }
#[derive(Debug, PartialEq, Eq, Ord)] #[derive(Debug)]
pub struct LogFile { pub struct LogFile {
pub path: PathBuf, pub path: PathBuf,
pub compressed: bool, pub compressed: bool,
pub timestamp: Timestamp, pub timestamp: Timestamp,
} }
impl Eq for LogFile {}
impl PartialEq for LogFile {
fn eq(&self, other: &Self) -> bool {
self.timestamp == other.timestamp
}
}
impl Ord for LogFile {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.timestamp.cmp(&other.timestamp)
}
}
impl PartialOrd for LogFile { impl PartialOrd for LogFile {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.timestamp.partial_cmp(&other.timestamp) Some(self.cmp(other))
} }
} }
+1
View File
@@ -0,0 +1 @@