37 Commits

Author SHA1 Message Date
Mikaël Cluseau 78d4d4f121 chore: Release dkl version 1.2.2 2026-05-03 15:01:09 +02:00
Mikaël Cluseau 720d433fd9 damn 2026-05-03 15:00:09 +02:00
Mikaël Cluseau c0c2c4afdf File: prepare migration out of flatenned kind 2026-05-03 12:26:29 +02:00
Mikaël Cluseau 71f4faa8cb add composite FileKind 2026-04-27 21:44:46 +02:00
Mikaël Cluseau 5414b1d529 dockerfile bump 2026-04-22 15:51:05 +02:00
Mikaël Cluseau dca467e21d chore: Release dkl version 1.2.1 2026-04-22 15:33:03 +02:00
Mikaël Cluseau 8d36882fbd dynlay: rely on auto-detect fstype 2026-04-22 15:32:28 +02:00
Mikaël Cluseau b1bee39653 more seed fetching options 2026-04-22 10:11:47 +02:00
Mikaël Cluseau 2e87e4d92f chore: Release dkl version 1.2.0 2026-04-17 19:14:57 +02:00
Mikaël Cluseau b58c9fbeb5 build-dist: locked 2026-04-17 19:14:21 +02:00
Mikaël Cluseau 15fe8c9ce6 feat(dkl rc) 2026-04-17 08:28:58 +02:00
Mikaël Cluseau 6059d81b3d chore: Release dkl version 1.1.0 2026-04-14 15:58:24 +02:00
Mikaël Cluseau f3b3a9b9c7 logger: cleanup 2026-04-14 15:57:47 +02:00
Mikaël Cluseau a5026b884d dkl logger: avoid a full thread just to forward signals 2026-04-14 10:06:39 +02:00
Mikaël Cluseau c19798f9f0 dkl cg ls: more cols + customizable list 2026-04-14 09:22:52 +02:00
Mikaël Cluseau dc936f52ab logger: add cgroup option 2026-04-13 21:11:07 +02:00
Mikaël Cluseau 33fcfbd197 add cg ls, prepare for rc subcommands 2026-04-13 14:34:53 +02:00
Mikaël Cluseau 0f116e21b9 release script 2026-03-17 16:50:45 +01:00
Mikaël Cluseau aa7f15516c bump dockerfile 2026-03-16 11:20:48 +01:00
Mikaël Cluseau 4619899e65 dls: add password hash function 2026-03-16 11:06:19 +01:00
Mikaël Cluseau 4b1edb2a55 reqwest: enable socks 2026-02-25 09:45:59 +01:00
Mikaël Cluseau d449fc8dcf dkl apply-config --dry-run 2026-02-21 18:15:39 +01:00
Mikaël Cluseau ddc82199fb cargo update 2026-02-21 08:18:07 +01:00
Mikaël Cluseau 61d31bc22c dls::Config.extra_ca_certs 2026-02-21 08:17:42 +01:00
Mikaël Cluseau d2293df011 base64: be a tolerant reader 2026-02-10 21:23:11 +01:00
Mikaël Cluseau 723cecff1b fix vg name compat 2026-02-10 15:41:40 +01:00
Mikaël Cluseau e8c9ee9885 wow base64 w/ and wo/ padding are incompatible 2026-01-25 21:59:23 +01:00
Mikaël Cluseau 6a6536bdfb files: add content64 for base64 encoded values 2026-01-25 20:01:50 +01:00
Mikaël Cluseau a6dc420275 cargo update 2026-01-07 18:24:14 +01:00
Mikaël Cluseau d9fa31ec33 bootstrap: impl Default for NetworkInterface 2026-01-07 18:24:02 +01:00
Mikaël Cluseau 93e5570293 use human_units for Duration params 2025-12-20 08:52:34 +01:00
Mikaël Cluseau fb3f8942d4 feat: (tcp) proxy 2025-12-19 23:16:06 +01:00
Mikaël Cluseau 7acc9e9a3e bootstrap: impl default for config 2025-12-19 18:21:03 +01:00
Mikaël Cluseau ac90b35142 add dls::Config 2025-12-12 16:58:40 +01:00
Mikaël Cluseau 298366a0aa Config: query files 2025-12-03 12:54:45 +01:00
Mikaël Cluseau ecbbb82c7a more default in dls HostConfig 2025-12-03 09:47:57 +01:00
Mikaël Cluseau ebd2f21d42 adjust initrd_files 2025-11-20 11:58:38 +01:00
20 changed files with 2689 additions and 669 deletions
Generated
+641 -456
View File
File diff suppressed because it is too large Load Diff
+13 -5
View File
@@ -1,6 +1,6 @@
[package]
name = "dkl"
version = "1.0.0"
version = "1.2.2"
edition = "2024"
[profile.release]
@@ -13,25 +13,33 @@ codegen-units = 1
[dependencies]
async-compression = { version = "0.4.27", features = ["tokio", "zstd"] }
base32 = "0.5.1"
base64 = "0.22.1"
bytes = "1.10.1"
chrono = { version = "0.4.41", default-features = false, features = ["clock", "now"] }
clap = { version = "4.5.40", features = ["derive", "env"] }
clap_complete = { version = "4.5.54", features = ["unstable-dynamic"] }
env_logger = "0.11.8"
eyre = "0.6.12"
fastrand = "2.3.0"
futures = "0.3.31"
futures-util = "0.3.31"
glob = "0.3.2"
hex = "0.4.3"
human-units = "0.5.3"
log = "0.4.27"
lz4 = "1.28.1"
nix = { version = "0.30.1", features = ["user"] }
memchr = "2.8.0"
nix = { version = "0.31.2", features = ["fs", "process", "signal", "user"] }
openssl = "0.10.73"
page_size = "0.6.0"
reqwest = { version = "0.12.20", features = ["json", "stream", "native-tls"] }
reqwest = { version = "0.13.1", features = ["json", "stream", "native-tls", "socks"], default-features = false }
rpassword = "7.4.0"
rust-argon2 = "3.0.0"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
serde_yaml = "0.9.34"
serde_yaml = { version = "0.10.0", package = "serde_yaml_ng" }
signal-hook = "0.4.4"
tabled = "0.20.0"
thiserror = "2.0.12"
tokio = { version = "1.45.1", features = ["fs", "io-std", "macros", "process", "rt"] }
tokio = { version = "1.45.1", features = ["fs", "io-std", "macros", "process", "rt", "signal"] }
+2 -2
View File
@@ -1,4 +1,4 @@
from mcluseau/rust:1.88.0 as build
from mcluseau/rust:1.95.0 as build
workdir /app
copy . .
@@ -10,6 +10,6 @@ run \
&& find target/release -maxdepth 1 -type f -executable -exec cp -v {} /dist/ +
# ------------------------------------------------------------------------
from alpine:3.22
from alpine:3.23.4
copy --from=build /dist/ /bin/
+1 -1
View File
@@ -1 +1 @@
cargo install --path . --root dist
cargo install --locked --path . --root dist
Executable
+17
View File
@@ -0,0 +1,17 @@
set -ex
tag=$(git describe --always)
repo=novit.tech/direktil/dkl:$tag
docker build --push --platform=linux/amd64,linux/arm64 . -t $repo
publish() {
arch=$1
pf=$2
curl --user $(jq '.auths["novit.tech"].auth' ~/.docker/config.json -r |base64 -d) \
--upload-file <(docker run --rm --platform $pf $repo cat /bin/dkl) \
https://novit.tech/api/packages/direktil/generic/dkl/$tag/dkl.$arch
}
publish x86_64 linux/amd64
publish arm64 linux/arm64
+83 -16
View File
@@ -1,35 +1,102 @@
use eyre::Result;
use eyre::{Result, format_err};
use log::info;
use std::path::Path;
use tokio::fs;
pub async fn files(files: &[crate::File], root: &str) -> Result<()> {
for file in files {
use crate::{base64_decode, File};
pub async fn files(files: &[File], root: &str, dry_run: bool) -> Result<()> {
for f in files {
if let Err(e) = file(f, root, dry_run).await {
return Err(format_err!("{}: {e}", f.path))
}
}
Ok(())
}
pub async fn file(file: &File, root: &str, dry_run: bool) -> Result<()> {
let path = chroot(root, &file.path);
let path = Path::new(&path);
if let Some(parent) = path.parent() {
if !dry_run && let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
use crate::FileKind as K;
match &file.kind {
K::Content(content) => fs::write(path, content.as_bytes()).await?,
K::Dir(true) => fs::create_dir(path).await?,
K::Dir(false) => {} // shouldn't happen, but semantic is to ignore
K::Symlink(tgt) => fs::symlink(tgt, path).await?,
use crate::{FileKind as K, FilePart as P};
match file.kind().as_ref() {
K::Skip => {
info!("{}: kind is skip", file.path);
return Ok(())
},
K::Content(content) => {
if dry_run {
info!(
"would create {} ({} bytes from content)",
file.path,
content.len()
);
} else {
fs::write(path, content.as_bytes()).await?;
}
}
K::Content64(content) => {
let content = base64_decode(content)?;
if dry_run {
info!(
"would create {} ({} bytes from content64)",
file.path,
content.len()
);
} else {
fs::write(path, content).await?
}
}
K::Parts(parts) => {
let mut assembly = Vec::new();
for part in parts {
match part {
P::Content(content) => assembly.extend(content.as_bytes()),
P::Content64(content) => assembly.extend(base64_decode(content)?),
}
}
if dry_run {
info!(
"would create {} ({} bytes from parts)",
file.path,
assembly.len()
);
} else {
fs::write(path, assembly).await?
}
}
K::Dir => {
if dry_run {
info!("would create {} (directory)", file.path);
} else {
fs::create_dir(path).await?;
}
}
K::Symlink(tgt) => {
if dry_run {
info!("would create {} (symlink to {})", file.path, tgt);
} else {
let _ = fs::remove_file(path).await; // we're ln --force
fs::symlink(tgt, path).await?;
}
}
}
match file.kind {
K::Symlink(_) => {}
_ => set_perms(path, file.mode).await?,
if dry_run {
return Ok(());
}
if !file.is_symlink() {
set_perms(path, file.mode).await?;
}
info!("created {}", file.path);
}
Ok(())
}
}
pub async fn set_perms(path: impl AsRef<Path>, mode: Option<u32>) -> std::io::Result<()> {
if let Some(mode) = mode.filter(|m| *m != 0) {
+173 -27
View File
@@ -1,11 +1,16 @@
use clap::{CommandFactory, Parser, Subcommand};
use eyre::{format_err, Result};
use human_units::Duration;
use log::{debug, error};
use std::net::SocketAddr;
use std::path::PathBuf;
use tokio::fs;
#[derive(Parser)]
#[command()]
struct Cli {
#[arg(long)]
log_to: Option<PathBuf>,
#[command(subcommand)]
command: Command,
}
@@ -22,18 +27,24 @@ enum Command {
/// path prefix (aka chroot)
#[arg(short = 'P', long, default_value = "/")]
prefix: String,
/// don't really write files
#[arg(long)]
dry_run: bool,
},
Logger {
/// Path where the logs are stored
#[arg(long, short = 'p', default_value = "/var/log", env = "DKL_LOG_PATH")]
log_path: String,
log_path: PathBuf,
/// Name of the log instead of the command's basename
#[arg(long, short = 'n')]
log_name: Option<String>,
log_name: Option<PathBuf>,
/// prefix log lines with time & stream
#[arg(long)]
with_prefix: bool,
command: String,
/// exec command in this cgroup
#[arg(long)]
cgroup: Option<String>,
command: PathBuf,
args: Vec<String>,
},
Log {
@@ -65,6 +76,67 @@ enum Command {
#[arg(long, default_value = "/")]
chroot: std::path::PathBuf,
},
Proxy {
#[arg(long, short = 'l')]
listen: Vec<SocketAddr>,
targets: Vec<SocketAddr>,
/// target polling interval
#[arg(long, default_value = "30s")]
poll: Duration,
/// connect or check timeout
#[arg(long, default_value = "5s")]
timeout: Duration,
},
Cg {
#[command(subcommand)]
cmd: CgCmd,
},
Rc {
#[command(subcommand)]
cmd: RcCmd,
},
}
#[derive(Subcommand)]
enum CgCmd {
Ls {
#[arg(long)]
root: Option<PathBuf>,
#[arg(long, short = 'X')]
exclude: Vec<String>,
#[arg(long, short = 'C')]
cols: Option<String>,
},
}
#[derive(Subcommand)]
enum RcCmd {
Run,
Ls,
Status,
ReloadConfig,
Start {
#[arg(add = completions(dkl::rc::complete))]
key: String,
},
Stop {
#[arg(add = completions(dkl::rc::complete))]
key: String,
},
Reload {
#[arg(add = completions(dkl::rc::complete))]
key: String,
},
Sig {
#[arg(add = completions(dkl::rc::complete))]
key: String,
signal: u32,
},
Ctl {
args: Vec<String>,
},
}
#[tokio::main(flavor = "current_thread")]
@@ -73,10 +145,22 @@ async fn main() -> Result<()> {
let cli = Cli::parse();
env_logger::builder()
.parse_filters("info")
.parse_default_env()
.init();
{
let mut builder = env_logger::builder();
builder.parse_filters("info").parse_default_env();
if let Some(log_to) = cli.log_to {
builder.target(env_logger::Target::Pipe(Box::new(
std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(log_to)
.unwrap(),
)));
}
builder.init();
}
use Command as C;
match cli.command {
@@ -84,27 +168,30 @@ async fn main() -> Result<()> {
config,
filters,
prefix,
dry_run,
} => {
let filters = parse_globs(&filters)?;
apply_config(&config, &filters, &prefix).await
apply_config(&config, &filters, &prefix, dry_run).await
}
C::Logger {
ref log_path,
ref log_name,
with_prefix,
command,
args,
} => {
let command = command.as_str();
let log_name = log_name.as_deref().unwrap_or_else(|| basename(command));
dkl::logger::Logger {
log_path,
log_name,
with_prefix,
}
.run(command, &args)
.await
cgroup,
command,
args,
} => {
let log_name = log_name.unwrap_or_else(|| command.file_prefix().unwrap().into());
let logger = dkl::logger::Logger {
log_path,
log_name,
with_prefix,
cgroup,
};
let cmd = logger.setup(command, &args).await?;
logger.exec(cmd).await
}
C::Log {
log_path,
@@ -126,10 +213,51 @@ async fn main() -> Result<()> {
.install(layer, version)
.await
}
C::Proxy {
listen,
targets,
poll,
timeout,
} => Ok(dkl::proxy::Proxy {
listen_addrs: listen,
targets,
poll: poll.into(),
timeout: timeout.into(),
}
.run()
.await
.map(|_| ())?),
C::Cg { cmd } => match cmd {
CgCmd::Ls {
root,
exclude,
cols,
} => Ok(dkl::cgroup::ls(root, &exclude, cols.as_deref()).await?),
},
C::Rc { cmd } => match cmd {
RcCmd::Run => Ok(dkl::rc::run().await?),
RcCmd::Ls => Ok(dkl::rc::ctl(["ls"]).await?),
RcCmd::Status => Ok(dkl::rc::ctl(["status"]).await?),
RcCmd::ReloadConfig => Ok(dkl::rc::ctl(["reload-config"]).await?),
RcCmd::Start { key } => Ok(dkl::rc::ctl(["start", &key]).await?),
RcCmd::Stop { key } => Ok(dkl::rc::ctl(["stop", &key]).await?),
RcCmd::Reload { key } => Ok(dkl::rc::ctl(["reload", &key]).await?),
RcCmd::Sig { key, signal } => {
Ok(dkl::rc::ctl(["sig", &key, &signal.to_string()]).await?)
}
RcCmd::Ctl { args } => Ok(dkl::rc::ctl(&args).await?),
},
}
}
async fn apply_config(config_file: &str, filters: &[glob::Pattern], chroot: &str) -> Result<()> {
async fn apply_config(
config_file: &str,
filters: &[glob::Pattern],
chroot: &str,
dry_run: bool,
) -> Result<()> {
let config = fs::read_to_string(config_file).await?;
let config: dkl::Config = serde_yaml::from_str(&config)?;
@@ -141,7 +269,7 @@ async fn apply_config(config_file: &str, filters: &[glob::Pattern], chroot: &str
.collect()
};
dkl::apply::files(&files, chroot).await
dkl::apply::files(&files, chroot, dry_run).await
}
#[derive(Subcommand)]
@@ -232,10 +360,6 @@ fn parse_ts_arg(ts: Option<String>) -> Result<Option<dkl::logger::Timestamp>> {
}
}
fn basename(path: &str) -> &str {
path.rsplit_once('/').map_or(path, |split| split.1)
}
fn parse_globs(filters: &[String]) -> Result<Vec<glob::Pattern>> {
let mut errors = false;
let filters = (filters.iter())
@@ -255,3 +379,25 @@ fn parse_globs(filters: &[String]) -> Result<Vec<glob::Pattern>> {
Ok(filters)
}
use clap_complete::{ArgValueCandidates, CompletionCandidate};
fn completions(f: impl AsyncFn() -> Vec<String> + Send + Sync + 'static) -> ArgValueCandidates {
let f = std::sync::Arc::new(f);
ArgValueCandidates::new(move || {
let f = f.clone();
std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async move { f().await })
})
.join()
.into_iter()
.flatten()
.map(CompletionCandidate::new)
.collect::<Vec<_>>()
})
}
+21 -6
View File
@@ -36,6 +36,10 @@ enum Command {
},
#[command(subcommand)]
DlSet(DlSet),
/// hash a password
Hash {
salt: String,
},
}
#[derive(Subcommand)]
@@ -103,14 +107,16 @@ async fn main() -> eyre::Result<()> {
.parse_default_env()
.init();
let token = std::env::var("DLS_TOKEN").map_err(|_| format_err!("DLS_TOKEN should be set"))?;
let dls = dls::Client::new(cli.dls, token);
let dls = || {
let token = std::env::var("DLS_TOKEN").expect("DLS_TOKEN should be set");
dls::Client::new(cli.dls, token)
};
use Command as C;
match cli.command {
C::Clusters => write_json(&dls.clusters().await?),
C::Clusters => write_json(&dls().clusters().await?),
C::Cluster { cluster, command } => {
let dls = dls();
let cluster = dls.cluster(cluster);
use ClusterCommand as CC;
@@ -155,8 +161,9 @@ async fn main() -> eyre::Result<()> {
}
}
}
C::Hosts => write_json(&dls.hosts().await?),
C::Hosts => write_json(&dls().hosts().await?),
C::Host { out, host, asset } => {
let dls = dls();
let host_name = host.clone();
let host = dls.host(host);
match asset {
@@ -171,7 +178,7 @@ async fn main() -> eyre::Result<()> {
C::DlSet(set) => match set {
DlSet::Sign { expiry, items } => {
let req = dls::DownloadSetReq { expiry, items };
let signed = dls.sign_dl_set(&req).await?;
let signed = dls().sign_dl_set(&req).await?;
println!("{signed}");
}
DlSet::Show { signed_set } => {
@@ -211,11 +218,19 @@ async fn main() -> eyre::Result<()> {
name,
asset,
} => {
let dls = dls();
let stream = dls.fetch_dl_set(&signed_set, &kind, &name, &asset).await?;
let mut out = create_asset_file(out, &kind, &name, &asset).await?;
copy_stream(stream, &mut out).await?;
}
},
C::Hash { salt } => {
let salt = dkl::base64_decode(&salt)?;
let passphrase = rpassword::prompt_password("password to hash: ")?;
let hash = dls::store::hash_password(&salt, &passphrase)?;
println!("hash (hex): {}", hex::encode(&hash));
println!("hash (base64): {}", dkl::base64_encode(&hash));
}
};
Ok(())
+22 -15
View File
@@ -2,7 +2,7 @@ use std::collections::BTreeMap as Map;
pub const TAKE_ALL: i16 = -1;
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
pub struct Config {
pub anti_phishing_code: String,
@@ -42,21 +42,11 @@ impl Config {
pub fn new(bootstrap_dev: String) -> Self {
Self {
anti_phishing_code: "Direktil<3".into(),
keymap: None,
modules: None,
resolv_conf: None,
vpns: Map::new(),
networks: vec![],
auths: vec![],
ssh: Default::default(),
pre_lvm_crypt: vec![],
lvm: vec![],
crypt: vec![],
signer_public_key: None,
bootstrap: Bootstrap {
dev: bootstrap_dev,
seed: None,
..Default::default()
},
..Default::default()
}
}
}
@@ -88,6 +78,17 @@ pub struct NetworkInterface {
pub udev: Option<UdevFilter>,
}
impl Default for NetworkInterface {
fn default() -> Self {
Self {
var: "iface".into(),
n: 1,
regexps: Vec::new(),
udev: Some(UdevFilter::Eq("INTERFACE".into(), "eth0".into())),
}
}
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct SSHServer {
pub listen: String,
@@ -104,7 +105,7 @@ impl Default for SSHServer {
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct LvmVG {
#[serde(alias = "vg")]
#[serde(rename = "vg", alias = "name")]
pub name: String,
pub pvs: LvmPV,
@@ -244,9 +245,15 @@ pub struct Raid {
pub stripes: Option<u8>,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
pub struct Bootstrap {
pub dev: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub seed: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub seed_ca: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub seed_servername: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub seed_proxy: Option<String>,
}
+381
View File
@@ -0,0 +1,381 @@
use std::borrow::Cow;
use std::fmt::Display;
use std::path::{Path as StdPath, PathBuf};
use std::rc::Rc;
use std::str::FromStr;
use crate::{fs, human::Human};
pub const ROOT: &str = "/sys/fs/cgroup";
pub async fn ls(
parent: Option<impl AsRef<StdPath>>,
exclude: &[String],
columns: Option<&str>,
) -> fs::Result<()> {
let mut root = PathBuf::from(ROOT);
if let Some(parent) = parent {
root = root.join(parent);
}
let cols: [(&str, fn(&Cgroup) -> String); _] = [
("wset", |cg| cg.memory.working_set().human()),
("anon", |cg| cg.memory.stat.anon.human()),
("min", |cg| cg.memory.min.human()),
("low", |cg| cg.memory.low.human()),
("high", |cg| cg.memory.high.human()),
("max", |cg| cg.memory.max.human()),
];
let cols = if let Some(columns) = columns {
(cols.into_iter())
.filter(|(n, _)| columns.split(',').any(|col| &col == n))
.collect()
} else {
cols.to_vec()
};
let mut table = tabled::builder::Builder::new();
table.push_record(["cgroup"].into_iter().chain(cols.iter().map(|(n, _)| *n)));
let mut todo = vec![(Cgroup::root(root).await?, vec![], true)];
while let Some((cg, p_lasts, last)) = todo.pop() {
let mut name = String::new();
for last in p_lasts.iter().skip(1) {
name.push_str(if *last { " " } else { "| " });
}
if !p_lasts.is_empty() {
name.push_str(if last { "`- " } else { "|- " });
}
name.push_str(&cg.name());
table.push_record([name].into_iter().chain(cols.iter().map(|(_, f)| f(&cg))));
let mut p_lasts = p_lasts.clone();
p_lasts.push(last);
let mut children = cg.read_children().await?;
children.sort();
todo.extend(
(children.into_iter().rev())
.filter(|c| !exclude.iter().any(|x| x == &c.path.full_name()))
.enumerate()
.map(|(i, child)| (child, p_lasts.clone(), i == 0)),
);
}
use tabled::settings::{
object::{Column, Row},
Alignment, Modify,
};
let mut table = table.build();
table.with(tabled::settings::Style::psql());
table.with(Alignment::right());
table.with(Modify::list(Column::from(0), Alignment::left()));
table.with(Modify::list(Row::from(0), Alignment::left()));
println!("{}", table);
Ok(())
}
pub struct Cgroup {
path: Rc<Path>,
children: Vec<PathBuf>,
memory: Memory,
}
impl Cgroup {
pub async fn root(path: impl AsRef<StdPath>) -> fs::Result<Self> {
let path = path.as_ref();
Self::read(Path::root(path), path).await
}
async fn read(cg_path: Rc<Path>, path: impl AsRef<StdPath>) -> fs::Result<Self> {
let path = path.as_ref();
use fs::Error as E;
let mut rd = fs::read_dir(path).await?;
let mut cg = Self {
path: cg_path,
children: Vec::new(),
memory: Memory::default(),
};
while let Some(entry) = (rd.next_entry().await).map_err(|e| E::ReadDir(path.into(), e))? {
let path = entry.path();
let Some(file_name) = path.file_name() else {
continue;
};
if (entry.file_type().await)
.map_err(|e| E::Stat(path.clone(), e))?
.is_dir()
{
cg.children.push(file_name.into());
continue;
}
let file_name = file_name.as_encoded_bytes();
let Some(idx) = file_name.iter().position(|b| *b == b'.') else {
continue;
};
let (controller, param) = file_name.split_at(idx);
let param = &param[1..];
match controller {
b"memory" => match param {
b"current" => cg.memory.current = read_parse(path).await?,
b"low" => cg.memory.low = read_parse(path).await?,
b"high" => cg.memory.high = read_parse(path).await?,
b"min" => cg.memory.min = read_parse(path).await?,
b"max" => cg.memory.max = read_parse(path).await?,
b"stat" => cg.memory.stat.read_from(path).await?,
_ => {}
},
_ => {}
}
}
Ok(cg)
}
async fn read_children(&self) -> fs::Result<Vec<Self>> {
let mut r = Vec::with_capacity(self.children.len());
let mut dir = PathBuf::from(self.path.as_ref());
for child_name in &self.children {
dir.push(child_name);
let child_path = Path::Child(self.path.clone(), child_name.into());
r.push(Self::read(child_path.into(), &dir).await?);
dir.pop();
}
Ok(r)
}
pub fn name(&self) -> Cow<'_, str> {
self.path.name()
}
pub fn full_name(&self) -> String {
self.path.full_name()
}
pub fn children(&self) -> impl Iterator<Item = &StdPath> {
self.children.iter().map(|n| n.as_path())
}
pub async fn read_child(&self, name: impl AsRef<StdPath>) -> fs::Result<Self> {
let name = name.as_ref();
let mut dir = PathBuf::from(self.path.as_ref());
dir.push(name);
Self::read(self.path.child(name), &dir).await
}
pub async fn write_param(
&self,
name: impl AsRef<StdPath>,
value: impl AsRef<[u8]>,
) -> fs::Result<()> {
let cg_dir = PathBuf::from(self.path.as_ref());
fs::write(cg_dir.join(name), value).await
}
}
impl PartialEq for Cgroup {
fn eq(&self, o: &Self) -> bool {
&self.path == &o.path
}
}
impl Eq for Cgroup {}
impl Ord for Cgroup {
fn cmp(&self, o: &Self) -> std::cmp::Ordering {
self.path.cmp(&o.path)
}
}
impl PartialOrd for Cgroup {
fn partial_cmp(&self, o: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(o))
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
enum Path {
Root(PathBuf),
Child(Rc<Path>, PathBuf),
}
impl Path {
fn name(&self) -> Cow<'_, str> {
match self {
Self::Root(_) => "/".into(),
Self::Child(_, n) => n.to_string_lossy(),
}
}
fn full_name(&self) -> String {
use Path::*;
match self {
Root(_) => "/".into(),
Child(parent, _) => match parent.as_ref() {
Root(_) => self.name().into(),
Child(_, _) => format!("{}/{}", parent.full_name(), self.name()),
},
}
}
fn depth(&self) -> usize {
use Path::*;
match self {
Root(_) => 0,
Child(p, _) => 1 + p.depth(),
}
}
fn root(dir: impl Into<PathBuf>) -> Rc<Self> {
Rc::new(Self::Root(dir.into()))
}
fn child(self: &Rc<Self>, name: impl Into<PathBuf>) -> Rc<Self> {
Rc::new(Self::Child(self.clone(), name.into()))
}
}
impl From<&Path> for PathBuf {
fn from(mut p: &Path) -> Self {
let mut stack = Vec::with_capacity(p.depth() + 1);
loop {
match p {
Path::Root(root_path) => {
stack.push(root_path);
break;
}
Path::Child(parent, n) => {
stack.push(n);
p = parent;
}
}
}
let len = stack.iter().map(|p| p.as_os_str().len() + 1).sum::<usize>() - 1;
let mut buf = PathBuf::with_capacity(len);
buf.extend(stack.into_iter().rev());
buf
}
}
#[test]
fn test_path_to_pathbuf() {
let root = Path::root("/a/b");
let c1 = root.child("c1");
let c1_1 = c1.child("c1-1");
assert_eq!(PathBuf::from("/a/b/c1"), PathBuf::from(c1.as_ref()));
assert_eq!(PathBuf::from("/a/b/c1/c1-1"), PathBuf::from(c1_1.as_ref()));
}
#[derive(Default)]
struct Memory {
current: Option<u64>,
low: Option<u64>,
high: Option<Max>,
min: Option<u64>,
max: Option<Max>,
stat: MemoryStat,
}
impl Memory {
/// working set as defined by cAdvisor
/// (https://github.com/google/cadvisor/blob/e1ccfa9b4cf2e17d74e0f5526b6487b74b704503/container/libcontainer/handler.go#L853-L862)
fn working_set(&self) -> Option<u64> {
let cur = self.current?;
let inactive = self.stat.inactive_file?;
(inactive <= cur).then(|| cur - inactive)
}
}
#[derive(Default)]
struct MemoryStat {
anon: Option<u64>,
file: Option<u64>,
kernel: Option<u64>,
kernel_stack: Option<u64>,
pagetables: Option<u64>,
shmem: Option<u64>,
inactive_file: Option<u64>,
}
enum Max {
Num(u64),
Max,
}
impl Display for Max {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Num(n) => write!(f, "{n}"),
Self::Max => f.write_str("max"),
}
}
}
impl Human for Max {
fn human(&self) -> String {
match self {
Self::Num(n) => n.human(),
Self::Max => "+∞".into(),
}
}
}
impl FromStr for Max {
type Err = std::num::ParseIntError;
fn from_str(s: &str) -> std::result::Result<Self, <Self as FromStr>::Err> {
Ok(match s {
"max" => Self::Max,
s => Self::Num(s.parse()?),
})
}
}
async fn read_parse<T: FromStr>(path: impl AsRef<StdPath>) -> fs::Result<Option<T>>
where
T::Err: Display,
{
let path = path.as_ref();
(fs::read_to_string(path).await?)
.trim_ascii()
.parse()
.map_err(|e| fs::Error::Other(path.into(), format!("parse failed: {e}")))
.map(|v| Some(v))
}
impl MemoryStat {
async fn read_from(&mut self, path: impl AsRef<StdPath>) -> fs::Result<()> {
for line in (fs::read_to_string(path).await?).lines() {
let Some((key, value)) = line.split_once(' ') else {
continue;
};
let value = value.parse::<u64>().ok();
match key {
"anon" => self.anon = value,
"file" => self.file = value,
"kernel" => self.kernel = value,
"kernel_stack" => self.kernel_stack = value,
"pagetables" => self.pagetables = value,
"shmem" => self.shmem = value,
"inactive_file" => self.inactive_file = value,
_ => {}
}
}
Ok(())
}
}
+34 -51
View File
@@ -5,7 +5,8 @@ use reqwest::Method;
use std::collections::BTreeMap as Map;
use std::fmt::Display;
use std::net::IpAddr;
use std::time::Duration;
pub mod store;
pub struct Client {
base_url: String,
@@ -160,6 +161,32 @@ impl<'t> Host<'t> {
}
}
#[derive(Default, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct Config {
#[serde(default, deserialize_with = "deserialize_null_as_default")]
pub clusters: Vec<ClusterConfig>,
#[serde(default, deserialize_with = "deserialize_null_as_default")]
pub hosts: Vec<HostConfig>,
#[serde(default, deserialize_with = "deserialize_null_as_default")]
pub host_templates: Vec<HostConfig>,
#[serde(default, rename = "SSLConfig")]
pub ssl_config: String,
#[serde(default, deserialize_with = "deserialize_null_as_default")]
pub extra_ca_certs: Map<String, String>,
}
// compensate for go's encoder pitfalls
use serde::{Deserialize, Deserializer};
fn deserialize_null_as_default<'de, D, T>(deserializer: D) -> std::result::Result<T, D::Error>
where
T: Default + Deserialize<'de>,
D: Deserializer<'de>,
{
let opt = Option::deserialize(deserializer)?;
Ok(opt.unwrap_or_default())
}
#[derive(serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct ClusterConfig {
@@ -172,15 +199,15 @@ pub struct ClusterConfig {
#[serde(rename_all = "PascalCase")]
pub struct HostConfig {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cluster_name: Option<String>,
#[serde(rename = "IPs")]
pub ips: Vec<IpAddr>,
#[serde(skip_serializing_if = "Map::is_empty")]
#[serde(default, skip_serializing_if = "Map::is_empty")]
pub labels: Map<String, String>,
#[serde(skip_serializing_if = "Map::is_empty")]
#[serde(default, skip_serializing_if = "Map::is_empty")]
pub annotations: Map<String, String>,
#[serde(rename = "IPXE", skip_serializing_if = "Option::is_none")]
@@ -190,10 +217,13 @@ pub struct HostConfig {
pub kernel: String,
pub versions: Map<String, String>,
/// initrd config template
pub bootstrap_config: String,
/// files to add to the final initrd config, with rendering
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub initrd_files: Vec<crate::File>,
/// system config template
pub config: String,
}
@@ -306,50 +336,3 @@ pub enum Error {
#[error("response parsing failed: {0}")]
Parse(serde_json::Error),
}
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum File {
Static(crate::File),
Gen { path: String, from: ContentGen },
}
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ContentGen {
CaCrt(CaRef),
TlsKey(TlsRef),
TlsCrt {
key: TlsRef,
ca: CaRef,
profile: CertProfile,
},
}
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CaRef {
Global(String),
Cluster(String, String),
}
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TlsRef {
Cluster(String, String),
Host(String, String),
}
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CertProfile {
Client,
Server,
/// basicaly Client+Server
Peer,
Kube {
user: String,
group: String,
duration: Duration,
},
}
+17
View File
@@ -0,0 +1,17 @@
pub fn hash_password(salt: &[u8], passphrase: &str) -> argon2::Result<[u8; 32]> {
let hash = argon2::hash_raw(
passphrase.as_bytes(),
salt,
&argon2::Config {
variant: argon2::Variant::Argon2id,
hash_length: 32,
time_cost: 1,
mem_cost: 65536,
thread_mode: argon2::ThreadMode::Parallel,
lanes: 4,
..Default::default()
},
)?;
unsafe { Ok(hash.try_into().unwrap_unchecked()) }
}
+1 -1
View File
@@ -60,7 +60,7 @@ impl<'t> Dynlay<'t> {
// mount layer
info!("mounting layer");
sudo("mount", &["-t", "squashfs", lay_path, &mount_path_str]).await?;
sudo("mount", &[lay_path, &mount_path_str]).await?;
let mut paths = spawn_walk_dir(mount_path.clone());
while let Some(result) = paths.recv().await {
+48 -4
View File
@@ -1,9 +1,53 @@
use eyre::Result;
use std::fs::Metadata;
use std::path::PathBuf;
use tokio::fs::read_dir;
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::sync::mpsc;
pub type Result<T> = std::result::Result<T, Error>;
pub use tokio::fs::ReadDir;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("{0}: read dir: {1}")]
ReadDir(PathBuf, std::io::Error),
#[error("{0}: exists: {1}")]
Exists(PathBuf, std::io::Error),
#[error("{0}: read: {1}")]
Read(PathBuf, std::io::Error),
#[error("{0}: stat: {1}")]
Stat(PathBuf, std::io::Error),
#[error("{0}: create dir: {1}")]
CreateDir(PathBuf, std::io::Error),
#[error("{0}: write: {1}")]
Write(PathBuf, std::io::Error),
#[error("{0}: remove file: {1}")]
RemoveFile(PathBuf, std::io::Error),
#[error("{0}: symlink: {1}")]
Symlink(PathBuf, std::io::Error),
#[error("{0}: {1}")]
Other(PathBuf, String),
}
macro_rules! wrap_path {
($fn:ident $( ( $( $pname:ident : $ptype:ty ),* ) )? -> $result:ty, $err:ident) => {
pub async fn $fn(path: impl AsRef<Path>$($(, $pname: $ptype)*)?) -> Result<$result> {
let path = path.as_ref();
fs::$fn(path $($(, $pname)*)?).await.map_err(|e| Error::$err(path.into(), e))
}
};
}
wrap_path!(read_dir -> ReadDir, ReadDir);
wrap_path!(try_exists -> bool, Exists);
wrap_path!(read -> Vec<u8>, Read);
wrap_path!(read_to_string -> String, Read);
wrap_path!(create_dir -> (), CreateDir);
wrap_path!(create_dir_all -> (), CreateDir);
wrap_path!(remove_file -> (), RemoveFile);
wrap_path!(symlink(link_src: impl AsRef<Path>) -> (), Symlink);
wrap_path!(write(content: impl AsRef<[u8]>) -> (), Write);
pub fn spawn_walk_dir(
dir: impl Into<PathBuf> + Send + 'static,
) -> mpsc::Receiver<Result<(PathBuf, Metadata)>> {
@@ -24,7 +68,7 @@ pub async fn walk_dir(dir: impl Into<PathBuf>, tx: mpsc::Sender<Result<(PathBuf,
let entry = match rd.next_entry().await {
Ok(v) => v,
Err(e) => {
if tx.send(Err(e.into())).await.is_err() {
if tx.send(Err(Error::ReadDir(dir.clone(), e))).await.is_err() {
return;
}
todo.pop_front(); // skip dir on error
+35
View File
@@ -0,0 +1,35 @@
use human_units::FormatSize;
use std::fmt::{Display, Formatter, Result};
pub trait Human {
fn human(&self) -> String;
}
pub struct Quantity(u64);
impl Display for Quantity {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
self.0.format_size().fmt(f)
}
}
impl Human for Quantity {
fn human(&self) -> String {
self.to_string()
}
}
impl Human for u64 {
fn human(&self) -> String {
self.format_size().to_string()
}
}
impl<T: Human> Human for Option<T> {
fn human(&self) -> String {
match self {
Some(h) => h.human(),
None => "".to_string(),
}
}
}
+89 -9
View File
@@ -1,9 +1,15 @@
use std::borrow::Cow;
pub mod apply;
pub mod bootstrap;
pub mod cgroup;
pub mod dls;
pub mod dynlay;
pub mod fs;
pub mod human;
pub mod logger;
pub mod proxy;
pub mod rc;
#[derive(Debug, Default, serde::Deserialize, serde::Serialize)]
pub struct Config {
@@ -52,19 +58,93 @@ pub struct User {
pub gid: Option<u32>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[derive(Default, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
pub struct File {
pub path: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub mode: Option<u32>,
#[serde(flatten)]
pub kind: FileKind,
pub kind: Option<FileKind>,
#[serde(skip_serializing_if = "Option::is_none")]
pub content: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub content64: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub symlink: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parts: Option<Vec<FilePart>>,
#[serde(default, skip_serializing_if = "is_false")]
pub dir: bool,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub enum FileKind {
Content(String),
Symlink(String),
Dir(bool),
fn is_false(b: &bool) -> bool {
!b
}
#[derive(Default, Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "lowercase")]
pub enum FileKind {
#[default]
Skip,
Content(String),
Content64(String),
Parts(Vec<FilePart>),
Symlink(String),
Dir,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "lowercase")]
pub enum FilePart {
Content(String),
Content64(String),
}
// ------------------------------------------------------------------------
impl Config {
pub fn has_file(&self, path: &str) -> bool {
self.files.iter().any(|f| f.path == path)
}
pub fn file(&self, path: &str) -> Option<&File> {
self.files.iter().find(|f| f.path == path)
}
}
pub fn base64_decode(s: &str) -> Result<Vec<u8>, base64::DecodeError> {
use base64::{prelude::BASE64_STANDARD_NO_PAD as B64, Engine as _};
B64.decode(s.trim_end_matches('='))
}
pub fn base64_encode(b: &[u8]) -> String {
use base64::{prelude::BASE64_STANDARD as B64, Engine as _};
B64.encode(b)
}
impl<'t> File {
pub fn kind(&'t self) -> Cow<'t, FileKind> {
self.kind.as_ref().map(Cow::Borrowed).unwrap_or_else(|| {
use FileKind::*;
Cow::Owned(if let Some(ref s) = self.content {
Content(s.clone())
} else if let Some(ref s) = self.content64 {
Content64(s.clone())
} else if let Some(ref s) = self.symlink {
Symlink(s.clone())
} else if let Some(ref p) = self.parts {
Parts(p.clone())
} else if self.dir {
Dir
} else {
Skip
})
})
}
pub fn is_symlink(&self) -> bool {
if let Some(ref kind) = self.kind {
matches!(kind, FileKind::Symlink(_))
} else {
self.symlink.is_some()
}
}
}
+218 -77
View File
@@ -2,16 +2,19 @@ use async_compression::tokio::write::{ZstdDecoder, ZstdEncoder};
use chrono::{DurationRound, TimeDelta, Utc};
use eyre::{format_err, Result};
use log::{debug, error, warn};
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use tokio::{
fs::{self, File},
fs::File,
io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter},
process,
process::{Child, Command},
sync::mpsc,
time::{sleep, Duration},
};
use crate::{cgroup, fs};
pub type Timestamp = chrono::DateTime<Utc>;
const TS_FORMAT: &str = "%Y%m%d_%H";
@@ -20,45 +23,92 @@ const TRUNC_DELTA: TimeDelta = TimeDelta::hours(1);
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
const WRITE_RETRY_DELAY: Duration = Duration::from_secs(1);
pub struct Logger<'t> {
pub log_path: &'t str,
pub log_name: &'t str,
pub struct Logger {
pub log_path: PathBuf,
pub log_name: PathBuf,
pub with_prefix: bool,
pub cgroup: Option<String>,
}
impl<'t> Logger<'t> {
pub async fn run(&self, command: &str, args: &[String]) -> Result<()> {
impl Logger {
pub async fn setup<I, S>(&self, command: impl AsRef<OsStr>, args: I) -> fs::Result<Command>
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
// make sure we can at least open the log before starting the command
let archives_path = &format!("{path}/archives", path = self.log_path);
(fs::create_dir_all(archives_path).await)
.map_err(|e| format_err!("failed to create archives dir: {e}"))?;
let archives_read_dir = (fs::read_dir(archives_path).await)
.map_err(|e| format_err!("failed to list archives: {e}"))?;
let archives_path = &self.log_path.join("archives");
fs::create_dir_all(archives_path).await?;
let mut prev_stamp = trunc_ts(Utc::now());
let mut current_log = BufWriter::new(self.open_log(prev_stamp).await?);
let archives_read_dir = fs::read_dir(archives_path).await?;
let prev_stamp = trunc_ts(Utc::now());
tokio::spawn(compress_archives(
archives_read_dir,
self.log_name.to_string(),
self.log_name.clone(),
prev_stamp.format(TS_FORMAT).to_string(),
));
// start the command
let mut child = process::Command::new(command)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
// create the command
let mut cmd = Command::new(command);
let (tx, mut rx) = mpsc::unbounded_channel();
cmd.args(args);
if let Some(cgroup) = self.cgroup.as_deref() {
let cg_path = PathBuf::from(cgroup::ROOT)
.join(cgroup)
.join(&self.log_name);
fs::create_dir_all(&cg_path).await?;
let procs_file = cg_path.join("cgroup.procs");
debug!("procs file {}", procs_file.display());
unsafe { cmd.pre_exec(move || std::fs::write(&procs_file, b"0")) };
}
Ok(cmd)
}
pub fn spawn(self, mut cmd: Command) -> std::io::Result<Child> {
// setup outputs for capture
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
// spawn
let mut child = cmd.spawn()?;
// capture outputs
let (tx, rx) = mpsc::channel(8);
tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone()));
tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx));
// log outputs
tokio::spawn(self.log_stream(rx));
Ok(child)
}
// TODO: Result<!> when stable
pub async fn exec(self, cmd: Command) -> Result<()> {
let mut child = self.spawn(cmd)?;
// forward signals
if let Some(child_pid) = child.id() {
forward_signals_to(child_pid as i32);
}
let status = child.wait().await?;
std::process::exit(status.code().unwrap_or(-1));
}
async fn log_stream(self, mut rx: mpsc::Receiver<LogItem>) {
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
// handle output
let mut prev_stamp = trunc_ts(Utc::now());
let mut current_log = BufWriter::new(self.eventually_open_log(prev_stamp).await);
loop {
tokio::select!(
r = rx.recv() => {
@@ -79,13 +129,10 @@ impl<'t> Logger<'t> {
}
// finalize
while let Err(e) = current_log.flush().await {
error!("final log flush failed: {e}");
while let Err(e) = current_log.shutdown().await {
error!("final log shutdown failed: {e}");
sleep(WRITE_RETRY_DELAY).await;
}
let status = child.wait().await?;
std::process::exit(status.code().unwrap_or(-1));
}
async fn log_item(
@@ -112,9 +159,24 @@ impl<'t> Logger<'t> {
}
out.write_all(&log.line).await?;
if log.line.last() != Some(&b'\n') {
out.write("\n".as_bytes()).await?;
}
Ok(())
}
async fn eventually_open_log(&self, ts: Timestamp) -> File {
loop {
match self.open_log(ts).await {
Ok(log) => break log,
Err(e) => {
error!("open log failed: {e}");
sleep(WRITE_RETRY_DELAY).await;
}
}
}
}
async fn open_log(&self, ts: Timestamp) -> Result<File> {
let log_file = &self.archive_path(ts);
@@ -125,36 +187,62 @@ impl<'t> Logger<'t> {
.open(log_file)
.await?;
let link_src = &format!(
"{path}/{name}.log",
path = self.log_path,
name = self.log_name
);
let link_tgt = log_file
.strip_prefix(&format!("{}/", self.log_path))
.unwrap_or(log_file);
let link_src = &self
.log_path
.join(&self.log_name)
.with_added_extension("log");
let link_tgt = &self.archive_rel_path(ts);
let _ = fs::remove_file(link_src).await;
fs::symlink(link_tgt, link_src)
.await
.map_err(|e| format_err!("symlink {link_src} -> {link_tgt} failed: {e}",))?;
fs::symlink(link_tgt, link_src).await.map_err(|e| {
format_err!(
"symlink {s} -> {t} failed: {e}",
s = link_src.display(),
t = link_tgt.display()
)
})?;
Ok(file)
}
fn archive_path(&self, ts: Timestamp) -> String {
format!(
"{path}/archives/{file}",
path = self.log_path,
file = self.archive_file(ts)
)
fn archive_path(&self, ts: Timestamp) -> PathBuf {
self.log_path.join(self.archive_rel_path(ts))
}
fn archive_rel_path(&self, ts: Timestamp) -> PathBuf {
PathBuf::from("archives").join(self.archive_file(ts))
}
fn archive_file(&self, ts: Timestamp) -> PathBuf {
self.log_name
.with_added_extension(ts.format(TS_FORMAT).to_string())
.with_added_extension("log")
}
}
fn forward_signals_to(pid: i32) {
use nix::{
sys::signal::{kill, Signal},
unistd::Pid,
};
use signal_hook::{consts::*, low_level::register};
debug!("forwarding signals to pid {pid}");
let pid = Pid::from_raw(pid);
let signals = [
SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGUSR1, SIGUSR2, SIGPIPE, SIGALRM,
];
for sig in signals {
let Ok(signal) = Signal::try_from(sig) else {
continue;
};
unsafe {
register(sig, move || {
debug!("forwarding {signal} to {pid}");
let _ = kill(pid, signal);
})
.ok();
}
fn archive_file(&self, ts: Timestamp) -> String {
format!(
"{name}.{ts}.log",
name = self.log_name,
ts = ts.format(TS_FORMAT),
)
}
}
@@ -164,32 +252,66 @@ struct LogItem {
line: Vec<u8>,
}
async fn copy(
stream_name: &'static str,
out: impl AsyncRead + Unpin,
tx: mpsc::UnboundedSender<LogItem>,
) {
let mut out = BufReader::new(out);
async fn copy(stream_name: &'static str, out: impl AsyncRead + Unpin, tx: mpsc::Sender<LogItem>) {
let buf_size = page_size::get();
loop {
let mut out = BufReader::with_capacity(buf_size, out);
let mut line = Vec::with_capacity(buf_size);
if let Err(e) = out.read_until(b'\n', &mut line).await {
warn!("read {stream_name} failed: {e}");
return;
}
if line.is_empty() {
break;
}
macro_rules! send_line {
() => {
let log = LogItem {
stream_name,
ts: chrono::Utc::now(),
line,
line: line.clone(),
};
if let Err(e) = tx.send(log) {
if let Err(e) = tx.send(log).await {
warn!("send line failed: {e}");
return;
}
line.clear();
};
}
loop {
let Ok(buf) = (out.fill_buf())
.await
.inspect_err(|e| warn!("read {stream_name} failed: {e}"))
else {
break;
};
if buf.is_empty() {
break;
}
let remaining = buf_size - line.len();
if let Some(pos) = memchr::memchr(b'\n', buf) {
let len = pos + 1;
let mut buf = &buf[..len];
if len > remaining {
line.extend_from_slice(&buf[..remaining]);
send_line!();
buf = &buf[remaining..];
}
line.extend_from_slice(buf);
out.consume(len);
send_line!();
} else if buf.len() >= remaining {
line.extend_from_slice(&buf[..remaining]);
out.consume(remaining);
send_line!();
} else {
line.extend_from_slice(buf);
let len = buf.len();
out.consume(len);
}
}
if !line.is_empty() {
send_line!();
}
}
@@ -198,7 +320,12 @@ pub fn trunc_ts(ts: Timestamp) -> Timestamp {
.expect("duration_trunc failed")
}
async fn compress_archives(mut read_dir: fs::ReadDir, log_name: String, exclude_ts: String) {
async fn compress_archives(
mut read_dir: fs::ReadDir,
log_name: impl AsRef<Path>,
exclude_ts: String,
) {
let log_name = log_name.as_ref();
loop {
let Ok(Some(entry)) =
(read_dir.next_entry().await).inspect_err(|e| error!("archive dir read failed: {e}"))
@@ -250,14 +377,14 @@ async fn compress(path: impl AsRef<Path>) {
let mut out = ZstdEncoder::new(out);
async {
tokio::io::copy(&mut input, &mut out).await?;
out.flush().await
out.shutdown().await
}
.await
.map_err(|e| format_err!("compression of {path_str} failed: {e}"))?;
fs::remove_file(path)
.await
.map_err(|e| format_err!("remove {path_str} failed: {e}"))
.map_err(|e| format_err!("remove {path_str} failed: {e}"))
}
.await;
@@ -267,19 +394,22 @@ async fn compress(path: impl AsRef<Path>) {
}
pub fn parse_ts(ts: &str) -> std::result::Result<Timestamp, chrono::ParseError> {
let dt =
chrono::NaiveDateTime::parse_from_str(&format!("{ts}0000"), &format!("{TS_FORMAT}%M%S"))?;
let format = &format!("{TS_FORMAT}%M%S");
let full_ts = &format!("{ts}0000");
let dt = chrono::NaiveDateTime::parse_from_str(full_ts, format)?;
Ok(Timestamp::from_naive_utc_and_offset(dt, Utc))
}
pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result<Vec<LogFile>> {
pub async fn log_files(log_path: &str, log_name: &str) -> fs::Result<Vec<LogFile>> {
let mut dir = PathBuf::from(log_path);
dir.push("archives");
let mut entries = Vec::new();
let mut read_dir = fs::read_dir(dir).await?;
let mut read_dir = fs::read_dir(&dir).await?;
while let Some(entry) = read_dir.next_entry().await? {
while let Some(entry) =
(read_dir.next_entry().await).map_err(|e| fs::Error::ReadDir(dir.clone(), e))?
{
let file_name = entry.file_name();
let Some(file_name) = file_name.to_str() else {
continue;
@@ -318,16 +448,27 @@ pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result<Vec<Lo
Ok(entries)
}
#[derive(Debug, PartialEq, Eq, Ord)]
#[derive(Debug)]
pub struct LogFile {
pub path: PathBuf,
pub compressed: bool,
pub timestamp: Timestamp,
}
impl Eq for LogFile {}
impl PartialEq for LogFile {
fn eq(&self, other: &Self) -> bool {
self.timestamp == other.timestamp
}
}
impl Ord for LogFile {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.timestamp.cmp(&other.timestamp)
}
}
impl PartialOrd for LogFile {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.timestamp.partial_cmp(&other.timestamp)
Some(self.cmp(other))
}
}
+136
View File
@@ -0,0 +1,136 @@
use log::{info, log_enabled, warn};
use std::convert::Infallible;
use std::io;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
use std::time::Duration;
use thiserror::Error;
use tokio::net::{TcpListener, TcpStream};
use tokio::time;
pub struct Proxy {
pub listen_addrs: Vec<SocketAddr>,
pub targets: Vec<SocketAddr>,
pub poll: Duration,
pub timeout: Duration,
}
#[derive(Debug, Error)]
pub enum Error {
#[error("failed to listen on {0}: {1}")]
ListenFailed(SocketAddr, std::io::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
impl Proxy {
pub async fn run(self) -> Result<Infallible> {
let mut listeners = Vec::with_capacity(self.listen_addrs.len());
for addr in self.listen_addrs {
listeners.push(
TcpListener::bind(&addr)
.await
.map_err(|e| Error::ListenFailed(addr, e))?,
);
info!("listening on {addr}");
}
// all targets are initially ok (better land on a down one than just fail)
let targets: Vec<_> = (self.targets.into_iter())
.map(|addr| TargetStatus {
addr,
up: AtomicBool::new(true),
timeout: self.timeout,
})
.collect();
// the proxy runs forever -> using 'static is not a leak
let targets = targets.leak();
for listener in listeners {
tokio::spawn(proxy_listener(listener, targets));
}
check_targets(targets, self.poll).await
}
}
struct TargetStatus {
addr: SocketAddr,
up: AtomicBool,
timeout: Duration,
}
impl TargetStatus {
fn is_up(&self) -> bool {
self.up.load(Relaxed)
}
fn set_up(&self, is_up: bool) {
let prev = self.up.swap(is_up, Relaxed);
if prev != is_up {
if is_up {
info!("{} is up", self.addr);
} else {
warn!("{} is down", self.addr);
}
}
}
async fn connect(&self) -> io::Result<TcpStream> {
let r = match time::timeout(self.timeout, TcpStream::connect(self.addr)).await {
Ok(r) => r,
Err(e) => Err(io::Error::new(io::ErrorKind::TimedOut, e)),
};
self.set_up(r.is_ok());
r
}
}
async fn check_targets(targets: &'static [TargetStatus], poll: Duration) -> ! {
use tokio::time;
let mut poll_ticker = time::interval(poll);
poll_ticker.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
loop {
poll_ticker.tick().await;
let mut tasks = tokio::task::JoinSet::new();
for target in targets {
tasks.spawn(target.connect());
}
tasks.join_all().await;
if log_enabled!(log::Level::Info) {
let mut infos = String::new();
for ts in targets.iter() {
infos.push_str(&format!("{} ", ts.addr));
infos.push_str(if ts.is_up() { "up " } else { "down " });
}
info!("{infos}");
}
}
}
async fn proxy_listener(listener: TcpListener, targets: &'static [TargetStatus]) {
let mut rng = fastrand::Rng::new();
loop {
let mut active = Vec::with_capacity(targets.len());
let (mut src, _) = listener.accept().await.expect("listener.accept() failed");
active.extend((targets.iter().enumerate()).filter_map(|(i, ts)| ts.is_up().then_some(i)));
rng.shuffle(&mut active);
tokio::spawn(async move {
for i in active {
if let Ok(mut dst) = targets[i].connect().await {
let _ = tokio::io::copy_bidirectional(&mut src, &mut dst).await;
break;
}
}
});
}
}
+495
View File
@@ -0,0 +1,495 @@
use eyre::format_err;
use log::{error, info, warn};
use nix::sys::signal::Signal;
use std::collections::{BTreeMap as Map, BTreeSet as Set};
use std::path::PathBuf;
use std::sync::LazyLock;
use tokio::{
io::{copy, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
net::{UnixListener, UnixStream},
sync::{mpsc, watch, RwLock},
};
use crate::{cgroup, fs};
mod runner;
use runner::{Child, State};
const CFG_PATH: &str = "/etc/direktil/rc.yaml";
const SOCK_PATH: &str = "/run/dkl-rc/ctl.sock"; // Path::new when stable
#[derive(Default, serde::Serialize, serde::Deserialize)]
pub struct Config {
#[serde(default, skip_serializing_if = "Map::is_empty")]
pub cgroups: Map<String, CgroupConfig>,
}
#[derive(serde::Serialize, serde::Deserialize)]
pub struct CgroupConfig {
pub controllers: String,
#[serde(default, skip_serializing_if = "Map::is_empty")]
pub settings: Map<String, String>,
#[serde(default, skip_serializing_if = "Map::is_empty")]
pub services: Map<String, Service>,
}
pub type Service = Vec<String>;
static MANAGER: LazyLock<RwLock<Manager>> = LazyLock::new(|| RwLock::new(Manager::default()));
type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, thiserror::Error)]
enum Error {
#[error("invalid command: {0:?}")]
InvalidCommand(String),
#[error("config read failed: {0}")]
ConfigRead(fs::Error),
#[error("config parse failed: {0}")]
ConfigParse(serde_yaml::Error),
#[error("cgroup setup failed: {0}")]
CgroupSetup(fs::Error),
#[error("invalid key (cgroup/service)")]
InvalidKey,
#[error("unknown cgroup: {0:?}")]
UnknownCgroup(String),
#[error("unknown service: {0:?}")]
UnknownService(String),
#[error("invalid signal: {0:?}")]
InvalidSignal(String),
#[error("process exited")]
ProcessExited,
#[error("nothing running under {0:?}")]
NotRunning(String),
#[error("kill failed: {0:?}")]
KillFailed(nix::Error),
#[error("service runner is dead")]
RunnerDead,
}
pub async fn run() -> eyre::Result<()> {
info!("starting");
tokio::spawn(wait_terminate());
let _ = reload_config().await;
tokio::spawn(wait_reload());
if let Some(sock_dir) = PathBuf::from(SOCK_PATH).parent() {
let _ = tokio::fs::DirBuilder::new()
.mode(0o700)
.create(sock_dir)
.await;
}
let _ = tokio::fs::remove_file(SOCK_PATH).await;
let listener = UnixListener::bind(SOCK_PATH)?;
loop {
let Ok((conn, _)) = listener.accept().await else {
warn!("listener closed");
break;
};
tokio::spawn(async move { handle(conn).await });
}
cleanup().await;
Ok(())
}
async fn cleanup() {
let _ = tokio::fs::remove_file(SOCK_PATH).await;
}
pub async fn ctl<I, S>(args: I) -> eyre::Result<()>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let args: Vec<_> = args.into_iter().map(|s| s.into()).collect();
let args = format!("{}\n", args.join(" "));
match ctl_exec(args.as_bytes()).await {
Ok(mut rd) => {
copy(&mut rd, &mut tokio::io::stdout()).await?;
std::process::exit(0);
}
Err(e) => {
eprint!("{e}");
std::process::exit(1);
}
}
}
async fn ctl_exec(request: &[u8]) -> eyre::Result<BufReader<UnixStream>> {
let mut conn = UnixStream::connect(SOCK_PATH)
.await
.map_err(|e| format_err!("{SOCK_PATH}: {e}"))?;
conn.write_all(request).await?;
let mut rd = BufReader::with_capacity(64, conn);
let mut code = String::new();
rd.read_line(&mut code).await?;
let code: i32 = code.trim_ascii().parse()?;
if code != 0 {
let mut err = String::new();
rd.read_to_string(&mut err).await?;
return Err(format_err!("{}", err.trim_ascii_end()));
}
Ok(rd)
}
async fn handle(mut conn: UnixStream) {
let (rd, mut wr) = conn.split();
let mut rd = BufReader::with_capacity(64, rd).lines();
let Ok(Some(line)) = rd.next_line().await else {
return;
};
let mut line = line.split_ascii_whitespace();
macro_rules! next {
() => {{
match line.next() {
Some(v) => v,
None => return,
}
}};
}
let r = match next!() {
"ls" => Ok(Some(ls().await)),
"status" => Ok(Some(status().await)),
"reload-config" => reload_config().await.map(|_| None),
"start" => start(next!()).await.map(|_| None),
"stop" => stop(next!()).await.map(|_| None),
"reload" => reload(next!()).await.map(|_| None),
"sig" => sig(next!(), next!()).await.map(|_| None),
cmd => Err(Error::InvalidCommand(cmd.into())),
};
let _ = match r {
Ok(None) => wr.write_all(b"0\n").await,
Ok(Some(s)) => wr.write_all(format!("0\n{s}\n").as_bytes()).await,
Err(e) => wr.write_all(format!("1\n{e}\n").as_bytes()).await,
};
let _ = wr.shutdown().await;
}
async fn wait_terminate() {
use tokio::signal::unix::{signal, SignalKind};
let Ok(mut sig) = signal(SignalKind::terminate())
.inspect_err(|e| error!("failed to listen to SIGTERM (will be ignored): {e}"))
else {
return;
};
sig.recv().await;
info!("SIGTERM received, terminating");
MANAGER.write().await.terminate().await;
cleanup().await;
log::logger().flush();
std::process::exit(0);
}
async fn wait_reload() {
use tokio::signal::unix::{signal, SignalKind};
let Ok(mut sig) = signal(SignalKind::hangup())
.inspect_err(|e| error!("failed to listen to SIGHUP (will be ignored): {e}"))
else {
return;
};
loop {
sig.recv().await;
let _ = reload_config().await;
}
}
async fn reload_config() -> Result<()> {
let cfg = (fs::read(CFG_PATH).await)
.map_err(Error::ConfigRead)
.inspect_err(|e| error!("{e}"))?;
let cfg = serde_yaml::from_slice::<Config>(&cfg)
.map_err(Error::ConfigParse)
.inspect_err(|e| error!("{CFG_PATH}: {e}"))?;
info!("applying new config");
let r = MANAGER.write().await.apply_config(cfg).await;
match &r {
Ok(_) => info!("applied new config"),
Err(e) => info!("failed to apply new config: {e}"),
}
r
}
async fn ls() -> String {
let mut keys = String::new();
for (i, k) in MANAGER.read().await.runners.keys().enumerate() {
if i != 0 {
keys.push('\n');
}
keys.push_str(k);
}
keys
}
async fn status() -> String {
let status = MANAGER.read().await.status();
let mut table = tabled::builder::Builder::new();
table.push_record(["cgroup", "service", "PID", "state", "msg"]);
for (cg_svc, child) in status {
let (cg, svc) = cg_svc.split_once('/').unwrap();
let pid = child.pid.map(|p| p.to_string());
table.push_record([
cg,
svc,
pid.as_deref().unwrap_or(""),
&format!("{:?}", child.state),
child.msg.as_deref().unwrap_or(""),
]);
}
(table.build())
.with(tabled::settings::Style::psql())
.to_string()
}
async fn start(key: &str) -> Result<()> {
MANAGER.write().await.start(key).await
}
async fn stop(key: &str) -> Result<()> {
MANAGER.write().await.stop(key).await
}
async fn reload(key: &str) -> Result<()> {
MANAGER.read().await.reload(key).await
}
async fn sig(key: &str, sig: &str) -> Result<()> {
let sig: Signal = sig.parse().map_err(|_| Error::InvalidSignal(sig.into()))?;
signal(key, sig).await
}
async fn child_for(key: &str) -> Result<Child> {
MANAGER.read().await.child_for(key)
}
async fn signal(key: &str, sig: Signal) -> Result<()> {
child_for(key).await?.kill(sig)
}
fn child_key(cg: &str, svc: &str) -> String {
[cg, svc].join("/")
}
fn split_key(key: &str) -> Result<(&str, &str)> {
key.split_once('/').ok_or(Error::InvalidKey)
}
#[derive(Default)]
struct Manager {
cfg: Config,
procs: Map<String, watch::Receiver<Child>>,
runners: Map<String, mpsc::Sender<runner::Cmd>>,
}
impl Manager {
fn status(&self) -> Vec<(String, Child)> {
(self.procs.iter())
.map(|(n, c)| (n.clone(), c.borrow().clone()))
.collect()
}
fn child_for(&self, key: &str) -> Result<Child> {
(self.procs.get(key))
.map(|c| c.borrow().clone())
.ok_or_else(|| Error::NotRunning(key.into()))
}
async fn apply_config(&mut self, new_cfg: Config) -> Result<()> {
// create and configure cgroups
for (name, cg) in &new_cfg.cgroups {
let cg_path = PathBuf::from(cgroup::ROOT).join(name);
fs::create_dir_all(&cg_path)
.await
.map_err(Error::CgroupSetup)?;
fs::write(
cg_path.join("cgroup.subtree_control"),
cg.controllers.as_bytes(),
)
.await
.map_err(Error::CgroupSetup)?;
for (setting, value) in &cg.settings {
fs::write(cg_path.join(setting), value.as_bytes())
.await
.map_err(Error::CgroupSetup)?;
}
}
let new_svcs: Set<_> = new_cfg.service_keys().collect();
// stop removed services
let to_stop = Map::from_iter(self.runners.extract_if(.., |k, _| !new_svcs.contains(k)));
let mut stopped = Set::new();
for (key, runner_cmd) in to_stop {
if runner_cmd.send(runner::Cmd::Stop).await.is_err() {
// runner already dead
continue;
}
stopped.insert(key);
}
// start added services
for (key, cg, svc, service) in new_cfg.services() {
if self.runners.contains_key(&key) {
continue;
};
let cmd = self.spawn_runner(key, cg, svc, service.clone());
if let Err(e) = cmd.send(runner::Cmd::Start).await {
error!("runner instantly died: {e}");
}
}
// wait & cleanup stopped
for key in stopped {
let Some(mut child_rx) = self.procs.remove(&key) else {
continue;
};
let _ = child_rx
.wait_for(|c| matches!(c.state, State::Finalized))
.await;
}
self.cfg = new_cfg;
Ok(())
}
async fn terminate(&mut self) {
self.runners.clear();
for child in self.procs.values_mut() {
let _ = child
.wait_for(|c| matches!(c.state, State::Finalized))
.await;
}
self.procs.clear();
}
fn runner(&mut self, key: &str) -> Result<mpsc::Sender<runner::Cmd>> {
if let Some(c) = self.runners.get(key) {
return Ok(c.clone());
}
let (cg, svc) = split_key(key)?;
let service = self.cfg.service(key)?;
Ok(self.spawn_runner(key.into(), cg, svc, service.clone()))
}
fn spawn_runner(
&mut self,
key: String,
cg: &str,
svc: &str,
service: Service,
) -> mpsc::Sender<runner::Cmd> {
let (runner, child_rx, cmds_tx) = runner::new(cg, svc, service);
tokio::spawn(runner.run());
self.procs.insert(key.clone(), child_rx);
self.runners.insert(key, cmds_tx.clone());
cmds_tx
}
async fn cmd(&mut self, key: &str, cmd: runner::Cmd) -> Result<()> {
if self.runner(key)?.send(cmd).await.is_err() {
// runner died
self.runners.remove(key);
return Err(Error::RunnerDead);
}
Ok(())
}
async fn start(&mut self, key: &str) -> Result<()> {
self.cmd(key, runner::Cmd::Start).await
}
async fn stop(&mut self, key: &str) -> Result<()> {
self.cmd(key, runner::Cmd::Stop).await
}
async fn reload(&self, key: &str) -> Result<()> {
let proc = (self.procs.get(key)) //
.ok_or_else(|| Error::UnknownService(key.into()))?;
proc.borrow().reload()
}
}
impl Config {
fn cgroup(&self, cg: &str) -> Result<&CgroupConfig> {
self.cgroups
.get(cg)
.ok_or_else(|| Error::UnknownCgroup(cg.into()))
}
fn service(&self, key: &str) -> Result<&Service> {
let (cg, svc) = split_key(key)?;
self.cgroup(cg)?.service(svc)
}
fn service_keys(&self) -> impl Iterator<Item = String> {
(self.cgroups.iter())
.map(|(cg_name, cg)| cg.services.keys().map(move |n| child_key(cg_name, n)))
.flatten()
}
fn services(&self) -> impl Iterator<Item = (String, &String, &String, &Service)> {
(self.cgroups.iter())
.map(|(cg_name, cg)| {
cg.services
.iter()
.map(move |(n, service)| (child_key(cg_name, n), cg_name, n, service))
})
.flatten()
}
}
impl CgroupConfig {
fn service(&self, svc: &str) -> Result<&Vec<String>> {
self.services
.get(svc)
.ok_or_else(|| Error::UnknownService(svc.into()))
}
}
pub async fn complete() -> Vec<String> {
let mut r = vec![];
let Ok(rd) = ctl_exec(b"ls\n").await else {
return r;
};
let mut rd = rd.lines();
while let Some(line) = rd.next_line().await.ok().flatten() {
r.push(line);
}
r
}
+263
View File
@@ -0,0 +1,263 @@
use log::{error, warn};
use nix::{
sys::signal::{kill, Signal},
unistd::Pid,
};
use std::num::NonZero;
use tokio::{
process, select,
sync::{mpsc, watch},
time::{sleep, sleep_until, Duration, Instant},
};
use super::{Error, Result, Service};
use crate::logger::Logger;
const LOG_PATH: &str = "/var/log";
const TERM_DELAY: Duration = Duration::from_secs(30);
const KILL_DELAY: Duration = Duration::from_secs(10);
const RESTART_DELAY: Duration = Duration::from_secs(8);
#[derive(Debug, Clone, Copy)]
pub enum Cmd {
Start,
Stop,
}
#[derive(Default, Clone, Copy, Debug)]
pub enum State {
#[default]
NeverStarted,
Starting,
Running,
Crashed,
Stopping,
Stopped,
Finalized,
}
pub fn new(
cg: impl Into<String>,
svc: impl Into<String>,
service: Service,
) -> (Runner, watch::Receiver<Child>, mpsc::Sender<Cmd>) {
let (manager, child_rx) = ProcessManager::new(service);
let (cmds_tx, cmds_rx) = mpsc::channel(1);
let r = Runner {
cg: cg.into(),
svc: svc.into(),
cmds_rx,
manager,
};
(r, child_rx, cmds_tx)
}
pub struct Runner {
cg: String,
svc: String,
cmds_rx: mpsc::Receiver<Cmd>,
manager: ProcessManager,
}
impl Runner {
pub async fn run(mut self) {
self.manager.update(State::NeverStarted);
loop {
let cmd = select! {
cmd = self.manager.manage() => {
cmd
}
cmd = self.cmds_rx.recv() => {
let Some(cmd) = cmd else {
break; // command side dropped
};
Some(cmd)
}
};
if let Some(cmd) = cmd {
self.process_cmd(cmd).await;
}
}
self.process_cmd(Cmd::Stop).await;
self.manager.update(State::Finalized);
}
async fn process_cmd(&mut self, cmd: Cmd) {
let cg = &self.cg;
let svc = &self.svc;
match cmd {
Cmd::Start => {
self.manager.start(cg, svc).await;
}
Cmd::Stop => {
self.manager.stop().await;
}
}
}
}
struct ProcessManager {
service: Service,
child_tx: watch::Sender<Child>,
process: Option<process::Child>,
restart_deadline: Option<Instant>,
}
impl ProcessManager {
fn new(service: Service) -> (Self, watch::Receiver<Child>) {
let (child_tx, child_rx) = watch::channel(Child::default());
let pm = Self {
service,
child_tx,
process: None,
restart_deadline: None,
};
(pm, child_rx)
}
/// runs a management iteration (ie: waiting for the child or a restart deadline).
async fn manage(&mut self) -> Option<Cmd> {
if let Some(process) = self.process.as_mut() {
let msg = match process.wait().await {
Ok(status) => status.to_string(),
Err(e) => e.to_string(),
};
self.crashed(msg);
self.process = None;
self.restart_deadline = Some(Instant::now() + RESTART_DELAY);
None
} else if let Some(deadline) = self.restart_deadline {
sleep_until(deadline).await;
Some(Cmd::Start)
} else {
std::future::pending().await
}
}
async fn start(&mut self, cg: &str, svc: &str) {
if self.process.is_some() {
return;
}
self.update(State::Starting);
let logger = Logger {
log_path: LOG_PATH.into(),
log_name: svc.into(),
with_prefix: false,
cgroup: Some(cg.into()),
};
let mut args = self.service.iter();
let Some(cmd) = args.next() else {
error!("{cg}/{svc}: empty command");
return;
};
let Ok(cmd) = (logger.setup(cmd, args).await)
.inspect_err(|e| self.crashed(format!("setup failed: {e}")))
else {
return;
};
let Ok(child) = logger
.spawn(cmd)
.inspect_err(|e| self.crashed(format!("exec failed: {e}")))
else {
return;
};
self.process = Some(child);
self.restart_deadline = None;
self.update(State::Running);
}
async fn stop(&mut self) {
self.restart_deadline = None;
let Some(mut process) = self.process.take() else {
return;
};
let Some(pid) = process.id() else {
let _ = process.wait().await; // already dead, reap it
self.update(State::Stopped);
return;
};
let pid = pid as i32;
self.update_full(pid, State::Stopping, None);
let pid = Pid::from_raw(pid);
let _ = kill(pid, Signal::SIGTERM).inspect_err(|e| error!("kill -TERM {pid} failed: {e}"));
select! {
_ = process.wait() => {
self.update(State::Stopped);
return
},
_ = sleep(TERM_DELAY) => {
warn!("process {pid} did not exit during the grace period, killing");
let _ = process.kill().await.inspect_err(|e| error!("kill -KILL {pid} failed: {e}"));
}
}
select! {
_ = process.wait() => {
self.update(State::Stopped);
return
},
_ = sleep(KILL_DELAY) => {
error!("process {pid} still alive after SIGKILL");
}
}
}
fn process_pid(&self) -> i32 {
(self.process.as_ref())
.and_then(|c| Some(c.id()? as i32))
.unwrap_or(0)
}
fn update(&self, state: State) {
let pid = self.process_pid();
self.update_full(pid, state, None);
}
fn update_full(&self, pid: i32, state: State, msg: Option<String>) {
self.child_tx.send_replace(Child {
pid: NonZero::new(pid),
state,
msg,
});
}
fn crashed(&self, msg: String) {
let pid = self.process_pid();
self.update_full(pid, State::Crashed, Some(msg));
}
}
#[derive(Clone, Default)]
pub struct Child {
pub pid: Option<NonZero<i32>>,
pub state: State,
pub msg: Option<String>,
}
impl Child {
pub fn reload(&self) -> Result<()> {
self.kill(Signal::SIGHUP)
}
pub fn kill(&self, sig: Signal) -> Result<()> {
let Some(pid) = self.pid else {
return Err(Error::ProcessExited);
};
kill(Pid::from_raw(pid.into()), sig).map_err(|e| Error::KillFailed(e))
}
}