Compare commits
54 Commits
b1bf8f3fb8
...
v1.2.2
| Author | SHA1 | Date | |
|---|---|---|---|
| 78d4d4f121 | |||
| 720d433fd9 | |||
| c0c2c4afdf | |||
| 71f4faa8cb | |||
| 5414b1d529 | |||
| dca467e21d | |||
| 8d36882fbd | |||
| b1bee39653 | |||
| 2e87e4d92f | |||
| b58c9fbeb5 | |||
| 15fe8c9ce6 | |||
| 6059d81b3d | |||
| f3b3a9b9c7 | |||
| a5026b884d | |||
| c19798f9f0 | |||
| dc936f52ab | |||
| 33fcfbd197 | |||
| 0f116e21b9 | |||
| aa7f15516c | |||
| 4619899e65 | |||
| 4b1edb2a55 | |||
| d449fc8dcf | |||
| ddc82199fb | |||
| 61d31bc22c | |||
| d2293df011 | |||
| 723cecff1b | |||
| e8c9ee9885 | |||
| 6a6536bdfb | |||
| a6dc420275 | |||
| d9fa31ec33 | |||
| 93e5570293 | |||
| fb3f8942d4 | |||
| 7acc9e9a3e | |||
| ac90b35142 | |||
| 298366a0aa | |||
| ecbbb82c7a | |||
| ebd2f21d42 | |||
| c4ed68d0e9 | |||
| 3fe6fc9222 | |||
| d7dfea2dec | |||
| a3d3ccfd25 | |||
| 9cef7a773e | |||
| 4ecee15b6b | |||
| 5e9f3e64d8 | |||
| 3cc2111ca7 | |||
| 1e047afac3 | |||
| 6f059287ec | |||
| bbea9b9c00 | |||
| be5db231d9 | |||
| b01c41b856 | |||
| 4ccda5039b | |||
| 852738bec3 | |||
| 7d02d8f932 | |||
| 52c23653ac |
+6
-2
@@ -1,8 +1,12 @@
|
||||
/target
|
||||
/dls
|
||||
/modd.conf
|
||||
/m1_bootstrap-config
|
||||
/config.yaml
|
||||
/dist
|
||||
/dkl
|
||||
/tmp
|
||||
|
||||
/config.yaml
|
||||
|
||||
# dls assets
|
||||
/cluster_*_*
|
||||
/host_*_*
|
||||
|
||||
Generated
+890
-586
File diff suppressed because it is too large
Load Diff
+19
-5
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "dkl"
|
||||
version = "1.0.0"
|
||||
version = "1.2.2"
|
||||
edition = "2024"
|
||||
|
||||
[profile.release]
|
||||
@@ -12,20 +12,34 @@ codegen-units = 1
|
||||
|
||||
[dependencies]
|
||||
async-compression = { version = "0.4.27", features = ["tokio", "zstd"] }
|
||||
base32 = "0.5.1"
|
||||
base64 = "0.22.1"
|
||||
bytes = "1.10.1"
|
||||
chrono = { version = "0.4.41", default-features = false, features = ["now"] }
|
||||
chrono = { version = "0.4.41", default-features = false, features = ["clock", "now"] }
|
||||
clap = { version = "4.5.40", features = ["derive", "env"] }
|
||||
clap_complete = { version = "4.5.54", features = ["unstable-dynamic"] }
|
||||
env_logger = "0.11.8"
|
||||
eyre = "0.6.12"
|
||||
fastrand = "2.3.0"
|
||||
futures = "0.3.31"
|
||||
futures-util = "0.3.31"
|
||||
glob = "0.3.2"
|
||||
hex = "0.4.3"
|
||||
human-units = "0.5.3"
|
||||
log = "0.4.27"
|
||||
lz4 = "1.28.1"
|
||||
memchr = "2.8.0"
|
||||
nix = { version = "0.31.2", features = ["fs", "process", "signal", "user"] }
|
||||
openssl = "0.10.73"
|
||||
page_size = "0.6.0"
|
||||
reqwest = { version = "0.12.20", features = ["json", "stream"] }
|
||||
reqwest = { version = "0.13.1", features = ["json", "stream", "native-tls", "socks"], default-features = false }
|
||||
rpassword = "7.4.0"
|
||||
rust-argon2 = "3.0.0"
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = "1.0.140"
|
||||
serde_yaml = "0.9.34"
|
||||
serde_yaml = { version = "0.10.0", package = "serde_yaml_ng" }
|
||||
signal-hook = "0.4.4"
|
||||
tabled = "0.20.0"
|
||||
thiserror = "2.0.12"
|
||||
tokio = { version = "1.45.1", features = ["fs", "io-std", "macros", "process", "rt"] }
|
||||
tokio = { version = "1.45.1", features = ["fs", "io-std", "macros", "process", "rt", "signal"] }
|
||||
|
||||
|
||||
+2
-2
@@ -1,4 +1,4 @@
|
||||
from mcluseau/rust:1.88.0 as build
|
||||
from mcluseau/rust:1.95.0 as build
|
||||
|
||||
workdir /app
|
||||
copy . .
|
||||
@@ -10,6 +10,6 @@ run \
|
||||
&& find target/release -maxdepth 1 -type f -executable -exec cp -v {} /dist/ +
|
||||
|
||||
# ------------------------------------------------------------------------
|
||||
from alpine:3.22
|
||||
from alpine:3.23.4
|
||||
copy --from=build /dist/ /bin/
|
||||
|
||||
|
||||
+1
-1
@@ -1 +1 @@
|
||||
cargo install --path . --root dist
|
||||
cargo install --locked --path . --root dist
|
||||
|
||||
Executable
+17
@@ -0,0 +1,17 @@
|
||||
set -ex
|
||||
tag=$(git describe --always)
|
||||
repo=novit.tech/direktil/dkl:$tag
|
||||
|
||||
docker build --push --platform=linux/amd64,linux/arm64 . -t $repo
|
||||
|
||||
publish() {
|
||||
arch=$1
|
||||
pf=$2
|
||||
|
||||
curl --user $(jq '.auths["novit.tech"].auth' ~/.docker/config.json -r |base64 -d) \
|
||||
--upload-file <(docker run --rm --platform $pf $repo cat /bin/dkl) \
|
||||
https://novit.tech/api/packages/direktil/generic/dkl/$tag/dkl.$arch
|
||||
}
|
||||
|
||||
publish x86_64 linux/amd64
|
||||
publish arm64 linux/arm64
|
||||
+83
-16
@@ -1,36 +1,103 @@
|
||||
use eyre::Result;
|
||||
use eyre::{Result, format_err};
|
||||
use log::info;
|
||||
use std::path::Path;
|
||||
use tokio::fs;
|
||||
|
||||
pub async fn files(files: &[crate::File], root: &str) -> Result<()> {
|
||||
for file in files {
|
||||
use crate::{base64_decode, File};
|
||||
|
||||
pub async fn files(files: &[File], root: &str, dry_run: bool) -> Result<()> {
|
||||
for f in files {
|
||||
if let Err(e) = file(f, root, dry_run).await {
|
||||
return Err(format_err!("{}: {e}", f.path))
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn file(file: &File, root: &str, dry_run: bool) -> Result<()> {
|
||||
let path = chroot(root, &file.path);
|
||||
let path = Path::new(&path);
|
||||
|
||||
if let Some(parent) = path.parent() {
|
||||
if !dry_run && let Some(parent) = path.parent() {
|
||||
fs::create_dir_all(parent).await?;
|
||||
}
|
||||
|
||||
use crate::FileKind as K;
|
||||
match &file.kind {
|
||||
K::Content(content) => fs::write(path, content.as_bytes()).await?,
|
||||
K::Dir(true) => fs::create_dir(path).await?,
|
||||
K::Dir(false) => {} // shouldn't happen, but semantic is to ignore
|
||||
K::Symlink(tgt) => fs::symlink(tgt, path).await?,
|
||||
use crate::{FileKind as K, FilePart as P};
|
||||
match file.kind().as_ref() {
|
||||
K::Skip => {
|
||||
info!("{}: kind is skip", file.path);
|
||||
return Ok(())
|
||||
},
|
||||
K::Content(content) => {
|
||||
if dry_run {
|
||||
info!(
|
||||
"would create {} ({} bytes from content)",
|
||||
file.path,
|
||||
content.len()
|
||||
);
|
||||
} else {
|
||||
fs::write(path, content.as_bytes()).await?;
|
||||
}
|
||||
}
|
||||
K::Content64(content) => {
|
||||
let content = base64_decode(content)?;
|
||||
if dry_run {
|
||||
info!(
|
||||
"would create {} ({} bytes from content64)",
|
||||
file.path,
|
||||
content.len()
|
||||
);
|
||||
} else {
|
||||
fs::write(path, content).await?
|
||||
}
|
||||
}
|
||||
K::Parts(parts) => {
|
||||
let mut assembly = Vec::new();
|
||||
for part in parts {
|
||||
match part {
|
||||
P::Content(content) => assembly.extend(content.as_bytes()),
|
||||
P::Content64(content) => assembly.extend(base64_decode(content)?),
|
||||
}
|
||||
}
|
||||
if dry_run {
|
||||
info!(
|
||||
"would create {} ({} bytes from parts)",
|
||||
file.path,
|
||||
assembly.len()
|
||||
);
|
||||
} else {
|
||||
fs::write(path, assembly).await?
|
||||
}
|
||||
}
|
||||
K::Dir => {
|
||||
if dry_run {
|
||||
info!("would create {} (directory)", file.path);
|
||||
} else {
|
||||
fs::create_dir(path).await?;
|
||||
}
|
||||
}
|
||||
K::Symlink(tgt) => {
|
||||
if dry_run {
|
||||
info!("would create {} (symlink to {})", file.path, tgt);
|
||||
} else {
|
||||
let _ = fs::remove_file(path).await; // we're ln --force
|
||||
fs::symlink(tgt, path).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match file.kind {
|
||||
K::Symlink(_) => {}
|
||||
_ => set_perms(path, file.mode).await?,
|
||||
if dry_run {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if !file.is_symlink() {
|
||||
set_perms(path, file.mode).await?;
|
||||
}
|
||||
|
||||
info!("created {}", file.path);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn set_perms(path: impl AsRef<Path>, mode: Option<u32>) -> std::io::Result<()> {
|
||||
if let Some(mode) = mode.filter(|m| *m != 0) {
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
+301
-60
@@ -1,11 +1,16 @@
|
||||
use clap::{CommandFactory, Parser, Subcommand};
|
||||
use eyre::{format_err, Result};
|
||||
use human_units::Duration;
|
||||
use log::{debug, error};
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command()]
|
||||
struct Cli {
|
||||
#[arg(long)]
|
||||
log_to: Option<PathBuf>,
|
||||
#[command(subcommand)]
|
||||
command: Command,
|
||||
}
|
||||
@@ -22,23 +27,115 @@ enum Command {
|
||||
/// path prefix (aka chroot)
|
||||
#[arg(short = 'P', long, default_value = "/")]
|
||||
prefix: String,
|
||||
/// don't really write files
|
||||
#[arg(long)]
|
||||
dry_run: bool,
|
||||
},
|
||||
Logger {
|
||||
#[arg(long, short = 'p', default_value = "/var/log")]
|
||||
log_path: String,
|
||||
/// Path where the logs are stored
|
||||
#[arg(long, short = 'p', default_value = "/var/log", env = "DKL_LOG_PATH")]
|
||||
log_path: PathBuf,
|
||||
/// Name of the log instead of the command's basename
|
||||
#[arg(long, short = 'n')]
|
||||
log_name: Option<String>,
|
||||
log_name: Option<PathBuf>,
|
||||
/// prefix log lines with time & stream
|
||||
#[arg(long)]
|
||||
with_prefix: bool,
|
||||
command: String,
|
||||
/// exec command in this cgroup
|
||||
#[arg(long)]
|
||||
cgroup: Option<String>,
|
||||
command: PathBuf,
|
||||
args: Vec<String>,
|
||||
},
|
||||
Log {
|
||||
#[arg(long, short = 'p', default_value = "/var/log")]
|
||||
/// Path where the logs are stored
|
||||
#[arg(long, short = 'p', default_value = "/var/log", env = "DKL_LOG_PATH")]
|
||||
log_path: String,
|
||||
/// Name of the log set to operate on.
|
||||
log_name: String,
|
||||
since: Option<String>,
|
||||
until: Option<String>,
|
||||
#[command(subcommand)]
|
||||
op: LogOp,
|
||||
},
|
||||
Dynlay {
|
||||
layer: String,
|
||||
version: String,
|
||||
#[arg(
|
||||
long,
|
||||
short = 'u',
|
||||
default_value = "https://dkl.novit.io/dist/layers",
|
||||
env = "DKL_DYNLAY_URL"
|
||||
)]
|
||||
url_prefix: String,
|
||||
#[arg(
|
||||
long,
|
||||
short = 'd',
|
||||
default_value = "/opt/dynlay",
|
||||
env = "DKL_DYNLAY_DIR"
|
||||
)]
|
||||
layers_dir: String,
|
||||
#[arg(long, default_value = "/")]
|
||||
chroot: std::path::PathBuf,
|
||||
},
|
||||
Proxy {
|
||||
#[arg(long, short = 'l')]
|
||||
listen: Vec<SocketAddr>,
|
||||
targets: Vec<SocketAddr>,
|
||||
/// target polling interval
|
||||
#[arg(long, default_value = "30s")]
|
||||
poll: Duration,
|
||||
/// connect or check timeout
|
||||
#[arg(long, default_value = "5s")]
|
||||
timeout: Duration,
|
||||
},
|
||||
|
||||
Cg {
|
||||
#[command(subcommand)]
|
||||
cmd: CgCmd,
|
||||
},
|
||||
|
||||
Rc {
|
||||
#[command(subcommand)]
|
||||
cmd: RcCmd,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum CgCmd {
|
||||
Ls {
|
||||
#[arg(long)]
|
||||
root: Option<PathBuf>,
|
||||
#[arg(long, short = 'X')]
|
||||
exclude: Vec<String>,
|
||||
#[arg(long, short = 'C')]
|
||||
cols: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum RcCmd {
|
||||
Run,
|
||||
Ls,
|
||||
Status,
|
||||
ReloadConfig,
|
||||
Start {
|
||||
#[arg(add = completions(dkl::rc::complete))]
|
||||
key: String,
|
||||
},
|
||||
Stop {
|
||||
#[arg(add = completions(dkl::rc::complete))]
|
||||
key: String,
|
||||
},
|
||||
Reload {
|
||||
#[arg(add = completions(dkl::rc::complete))]
|
||||
key: String,
|
||||
},
|
||||
Sig {
|
||||
#[arg(add = completions(dkl::rc::complete))]
|
||||
key: String,
|
||||
signal: u32,
|
||||
},
|
||||
Ctl {
|
||||
args: Vec<String>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -48,10 +145,22 @@ async fn main() -> Result<()> {
|
||||
|
||||
let cli = Cli::parse();
|
||||
|
||||
env_logger::builder()
|
||||
.parse_filters("info")
|
||||
.parse_default_env()
|
||||
.init();
|
||||
{
|
||||
let mut builder = env_logger::builder();
|
||||
builder.parse_filters("info").parse_default_env();
|
||||
|
||||
if let Some(log_to) = cli.log_to {
|
||||
builder.target(env_logger::Target::Pipe(Box::new(
|
||||
std::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(log_to)
|
||||
.unwrap(),
|
||||
)));
|
||||
}
|
||||
|
||||
builder.init();
|
||||
}
|
||||
|
||||
use Command as C;
|
||||
match cli.command {
|
||||
@@ -59,55 +168,184 @@ async fn main() -> Result<()> {
|
||||
config,
|
||||
filters,
|
||||
prefix,
|
||||
dry_run,
|
||||
} => {
|
||||
let filters = parse_globs(&filters)?;
|
||||
apply_config(&config, &filters, &prefix).await
|
||||
apply_config(&config, &filters, &prefix, dry_run).await
|
||||
}
|
||||
C::Logger {
|
||||
ref log_path,
|
||||
ref log_name,
|
||||
log_path,
|
||||
log_name,
|
||||
with_prefix,
|
||||
cgroup,
|
||||
command,
|
||||
args,
|
||||
} => {
|
||||
let command = command.as_str();
|
||||
let log_name = log_name.as_deref().unwrap_or_else(|| basename(command));
|
||||
let log_name = log_name.unwrap_or_else(|| command.file_prefix().unwrap().into());
|
||||
|
||||
dkl::logger::Logger {
|
||||
let logger = dkl::logger::Logger {
|
||||
log_path,
|
||||
log_name,
|
||||
with_prefix,
|
||||
}
|
||||
.run(command, &args)
|
||||
.await
|
||||
cgroup,
|
||||
};
|
||||
|
||||
let cmd = logger.setup(command, &args).await?;
|
||||
logger.exec(cmd).await
|
||||
}
|
||||
C::Log {
|
||||
log_path,
|
||||
log_name,
|
||||
since,
|
||||
until,
|
||||
op,
|
||||
} => op.run(&log_path, &log_name).await,
|
||||
C::Dynlay {
|
||||
ref layer,
|
||||
ref version,
|
||||
ref url_prefix,
|
||||
ref layers_dir,
|
||||
chroot,
|
||||
} => {
|
||||
let since = parse_ts_arg(since)?;
|
||||
let until = parse_ts_arg(until)?;
|
||||
|
||||
let mut files = dkl::logger::log_files(&log_path, &log_name).await?;
|
||||
files.sort();
|
||||
|
||||
let mut out = tokio::io::stdout();
|
||||
|
||||
for f in files {
|
||||
if !since.is_none_or(|since| f.timestamp >= since) {
|
||||
continue;
|
||||
}
|
||||
if !until.is_none_or(|until| f.timestamp <= until) {
|
||||
continue;
|
||||
}
|
||||
|
||||
debug!("{f:?}");
|
||||
f.copy_to(&mut out).await?;
|
||||
dkl::dynlay::Dynlay {
|
||||
url_prefix,
|
||||
layers_dir,
|
||||
chroot,
|
||||
}
|
||||
Ok(())
|
||||
.install(layer, version)
|
||||
.await
|
||||
}
|
||||
C::Proxy {
|
||||
listen,
|
||||
targets,
|
||||
poll,
|
||||
timeout,
|
||||
} => Ok(dkl::proxy::Proxy {
|
||||
listen_addrs: listen,
|
||||
targets,
|
||||
poll: poll.into(),
|
||||
timeout: timeout.into(),
|
||||
}
|
||||
.run()
|
||||
.await
|
||||
.map(|_| ())?),
|
||||
|
||||
C::Cg { cmd } => match cmd {
|
||||
CgCmd::Ls {
|
||||
root,
|
||||
exclude,
|
||||
cols,
|
||||
} => Ok(dkl::cgroup::ls(root, &exclude, cols.as_deref()).await?),
|
||||
},
|
||||
|
||||
C::Rc { cmd } => match cmd {
|
||||
RcCmd::Run => Ok(dkl::rc::run().await?),
|
||||
RcCmd::Ls => Ok(dkl::rc::ctl(["ls"]).await?),
|
||||
RcCmd::Status => Ok(dkl::rc::ctl(["status"]).await?),
|
||||
RcCmd::ReloadConfig => Ok(dkl::rc::ctl(["reload-config"]).await?),
|
||||
RcCmd::Start { key } => Ok(dkl::rc::ctl(["start", &key]).await?),
|
||||
RcCmd::Stop { key } => Ok(dkl::rc::ctl(["stop", &key]).await?),
|
||||
RcCmd::Reload { key } => Ok(dkl::rc::ctl(["reload", &key]).await?),
|
||||
RcCmd::Sig { key, signal } => {
|
||||
Ok(dkl::rc::ctl(["sig", &key, &signal.to_string()]).await?)
|
||||
}
|
||||
RcCmd::Ctl { args } => Ok(dkl::rc::ctl(&args).await?),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn apply_config(
|
||||
config_file: &str,
|
||||
filters: &[glob::Pattern],
|
||||
chroot: &str,
|
||||
dry_run: bool,
|
||||
) -> Result<()> {
|
||||
let config = fs::read_to_string(config_file).await?;
|
||||
let config: dkl::Config = serde_yaml::from_str(&config)?;
|
||||
|
||||
let files = if filters.is_empty() {
|
||||
config.files
|
||||
} else {
|
||||
(config.files.into_iter())
|
||||
.filter(|f| filters.iter().any(|filter| filter.matches(&f.path)))
|
||||
.collect()
|
||||
};
|
||||
|
||||
dkl::apply::files(&files, chroot, dry_run).await
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum LogOp {
|
||||
Ls {
|
||||
#[arg(short = 'l', long)]
|
||||
detail: bool,
|
||||
},
|
||||
Cleanup {
|
||||
/// days of log to keep
|
||||
days: u64,
|
||||
},
|
||||
Cat {
|
||||
/// print logs >= since
|
||||
since: Option<String>,
|
||||
/// print logs <= until
|
||||
until: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
impl LogOp {
|
||||
async fn run(self, log_path: &str, log_name: &str) -> Result<()> {
|
||||
let mut files = dkl::logger::log_files(&log_path, &log_name).await?;
|
||||
files.sort();
|
||||
|
||||
use LogOp as Op;
|
||||
match self {
|
||||
Op::Ls { detail } => {
|
||||
for f in files {
|
||||
let path = f.path.to_string_lossy();
|
||||
if detail {
|
||||
println!("{ts} {path}", ts = f.timestamp);
|
||||
} else {
|
||||
println!("{path}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Op::Cleanup { days } => {
|
||||
let deadline = chrono::Utc::now() - chrono::Days::new(days);
|
||||
let deadline = dkl::logger::trunc_ts(deadline);
|
||||
debug!("cleanup {log_name} logs < {deadline}");
|
||||
|
||||
for f in files {
|
||||
if f.timestamp < deadline {
|
||||
debug!("removing {}", f.path.to_string_lossy());
|
||||
fs::remove_file(f.path).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Op::Cat { since, until } => {
|
||||
let since = parse_ts_arg(since)?;
|
||||
let until = parse_ts_arg(until)?;
|
||||
|
||||
let mut out = tokio::io::stdout();
|
||||
|
||||
for f in files {
|
||||
if !since.is_none_or(|since| f.timestamp >= since) {
|
||||
continue;
|
||||
}
|
||||
if !until.is_none_or(|until| f.timestamp <= until) {
|
||||
continue;
|
||||
}
|
||||
|
||||
debug!(
|
||||
"cat {path} (timestamp={ts}, compressed={comp})",
|
||||
path = f.path.to_string_lossy(),
|
||||
ts = f.timestamp.to_rfc3339(),
|
||||
comp = f.compressed
|
||||
);
|
||||
if let Err(e) = f.copy_to(&mut out).await {
|
||||
error!("{file}: {e}", file = f.path.to_string_lossy());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -122,25 +360,6 @@ fn parse_ts_arg(ts: Option<String>) -> Result<Option<dkl::logger::Timestamp>> {
|
||||
}
|
||||
}
|
||||
|
||||
fn basename(path: &str) -> &str {
|
||||
path.rsplit_once('/').map_or(path, |split| split.1)
|
||||
}
|
||||
|
||||
async fn apply_config(config_file: &str, filters: &[glob::Pattern], chroot: &str) -> Result<()> {
|
||||
let config = fs::read_to_string(config_file).await?;
|
||||
let config: dkl::Config = serde_yaml::from_str(&config)?;
|
||||
|
||||
let files = if filters.is_empty() {
|
||||
config.files
|
||||
} else {
|
||||
(config.files.into_iter())
|
||||
.filter(|f| filters.iter().any(|filter| filter.matches(&f.path)))
|
||||
.collect()
|
||||
};
|
||||
|
||||
dkl::apply::files(&files, chroot).await
|
||||
}
|
||||
|
||||
fn parse_globs(filters: &[String]) -> Result<Vec<glob::Pattern>> {
|
||||
let mut errors = false;
|
||||
let filters = (filters.iter())
|
||||
@@ -160,3 +379,25 @@ fn parse_globs(filters: &[String]) -> Result<Vec<glob::Pattern>> {
|
||||
|
||||
Ok(filters)
|
||||
}
|
||||
|
||||
use clap_complete::{ArgValueCandidates, CompletionCandidate};
|
||||
|
||||
fn completions(f: impl AsyncFn() -> Vec<String> + Send + Sync + 'static) -> ArgValueCandidates {
|
||||
let f = std::sync::Arc::new(f);
|
||||
|
||||
ArgValueCandidates::new(move || {
|
||||
let f = f.clone();
|
||||
std::thread::spawn(move || {
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
.block_on(async move { f().await })
|
||||
})
|
||||
.join()
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.map(CompletionCandidate::new)
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
}
|
||||
|
||||
+161
-35
@@ -1,14 +1,18 @@
|
||||
use bytes::Bytes;
|
||||
use clap::{CommandFactory, Parser, Subcommand};
|
||||
use eyre::{Result, format_err};
|
||||
use eyre::format_err;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use tokio::fs;
|
||||
use tokio::io::{AsyncWrite, AsyncWriteExt};
|
||||
|
||||
use dkl::dls;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command()]
|
||||
struct Cli {
|
||||
#[arg(long, default_value = "http://[::1]:7606")]
|
||||
#[arg(long, default_value = "http://[::1]:7606", env = "DLS_URL")]
|
||||
dls: String,
|
||||
|
||||
#[command(subcommand)]
|
||||
@@ -25,9 +29,40 @@ enum Command {
|
||||
},
|
||||
Hosts,
|
||||
Host {
|
||||
#[arg(short = 'o', long)]
|
||||
out: Option<String>,
|
||||
host: String,
|
||||
asset: Option<String>,
|
||||
},
|
||||
#[command(subcommand)]
|
||||
DlSet(DlSet),
|
||||
/// hash a password
|
||||
Hash {
|
||||
salt: String,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum DlSet {
|
||||
Sign {
|
||||
#[arg(short = 'e', long, default_value = "1d")]
|
||||
expiry: String,
|
||||
#[arg(value_parser = parse_download_set_item)]
|
||||
items: Vec<dls::DownloadSetItem>,
|
||||
},
|
||||
Show {
|
||||
#[arg(env = "DLS_DLSET")]
|
||||
signed_set: String,
|
||||
},
|
||||
Fetch {
|
||||
#[arg(long, env = "DLS_DLSET")]
|
||||
signed_set: String,
|
||||
#[arg(short = 'o', long)]
|
||||
out: Option<String>,
|
||||
kind: String,
|
||||
name: String,
|
||||
asset: String,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
@@ -62,7 +97,7 @@ enum ClusterCommand {
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> Result<()> {
|
||||
async fn main() -> eyre::Result<()> {
|
||||
clap_complete::CompleteEnv::with_factory(Cli::command).complete();
|
||||
|
||||
let cli = Cli::parse();
|
||||
@@ -72,14 +107,16 @@ async fn main() -> Result<()> {
|
||||
.parse_default_env()
|
||||
.init();
|
||||
|
||||
let token = std::env::var("DLS_TOKEN").map_err(|_| format_err!("DLS_TOKEN should be set"))?;
|
||||
|
||||
let dls = dls::Client::new(cli.dls, token);
|
||||
let dls = || {
|
||||
let token = std::env::var("DLS_TOKEN").expect("DLS_TOKEN should be set");
|
||||
dls::Client::new(cli.dls, token)
|
||||
};
|
||||
|
||||
use Command as C;
|
||||
match cli.command {
|
||||
C::Clusters => write_json(&dls.clusters().await?),
|
||||
C::Clusters => write_json(&dls().clusters().await?),
|
||||
C::Cluster { cluster, command } => {
|
||||
let dls = dls();
|
||||
let cluster = dls.cluster(cluster);
|
||||
|
||||
use ClusterCommand as CC;
|
||||
@@ -124,45 +161,120 @@ async fn main() -> Result<()> {
|
||||
}
|
||||
}
|
||||
}
|
||||
C::Hosts => write_json(&dls.hosts().await?),
|
||||
C::Host { host, asset } => {
|
||||
C::Hosts => write_json(&dls().hosts().await?),
|
||||
C::Host { out, host, asset } => {
|
||||
let dls = dls();
|
||||
let host_name = host.clone();
|
||||
let host = dls.host(host);
|
||||
match asset {
|
||||
None => write_json(&host.config().await?),
|
||||
Some(asset) => {
|
||||
let mut stream = host.asset(&asset).await?;
|
||||
|
||||
let out_path = format!("{host_name}_{asset}");
|
||||
eprintln!("writing {host_name} asset {asset} to {out_path}");
|
||||
|
||||
let out = tokio::fs::File::options()
|
||||
.mode(0o600)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.open(out_path)
|
||||
.await?;
|
||||
let mut out = tokio::io::BufWriter::new(out);
|
||||
|
||||
let mut n = 0u64;
|
||||
while let Some(chunk) = stream.next().await {
|
||||
let chunk = chunk?;
|
||||
n += chunk.len() as u64;
|
||||
eprint!("wrote {n} bytes\r");
|
||||
out.write_all(&chunk).await?;
|
||||
}
|
||||
eprintln!();
|
||||
|
||||
out.flush().await?;
|
||||
let stream = host.asset(&asset).await?;
|
||||
let mut out = create_asset_file(out, "host", &host_name, &asset).await?;
|
||||
copy_stream(stream, &mut out).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
C::DlSet(set) => match set {
|
||||
DlSet::Sign { expiry, items } => {
|
||||
let req = dls::DownloadSetReq { expiry, items };
|
||||
let signed = dls().sign_dl_set(&req).await?;
|
||||
println!("{signed}");
|
||||
}
|
||||
DlSet::Show { signed_set } => {
|
||||
let raw = base32::decode(base32::Alphabet::Rfc4648 { padding: false }, &signed_set)
|
||||
.ok_or(format_err!("invalid dlset"))?;
|
||||
|
||||
let sig_len = raw[0] as usize;
|
||||
let (sig, data) = raw[1..].split_at(sig_len);
|
||||
println!("signature: {}...", hex::encode(&sig[..16]));
|
||||
|
||||
let data = lz4::Decoder::new(data)?;
|
||||
let data = std::io::read_to_string(data)?;
|
||||
|
||||
let (expiry, items) = data.split_once('|').ok_or(format_err!("invalid dlset"))?;
|
||||
let expiry = i64::from_str_radix(expiry, 16)?;
|
||||
let expiry = chrono::DateTime::from_timestamp(expiry, 0).unwrap();
|
||||
|
||||
println!("expires on {expiry}");
|
||||
|
||||
for item in items.split('|') {
|
||||
let mut parts = item.split(':');
|
||||
let Some(kind) = parts.next() else {
|
||||
continue;
|
||||
};
|
||||
let Some(name) = parts.next() else {
|
||||
continue;
|
||||
};
|
||||
for asset in parts {
|
||||
println!("- {kind} {name} {asset}");
|
||||
}
|
||||
}
|
||||
}
|
||||
DlSet::Fetch {
|
||||
signed_set,
|
||||
out,
|
||||
kind,
|
||||
name,
|
||||
asset,
|
||||
} => {
|
||||
let dls = dls();
|
||||
let stream = dls.fetch_dl_set(&signed_set, &kind, &name, &asset).await?;
|
||||
let mut out = create_asset_file(out, &kind, &name, &asset).await?;
|
||||
copy_stream(stream, &mut out).await?;
|
||||
}
|
||||
},
|
||||
C::Hash { salt } => {
|
||||
let salt = dkl::base64_decode(&salt)?;
|
||||
let passphrase = rpassword::prompt_password("password to hash: ")?;
|
||||
let hash = dls::store::hash_password(&salt, &passphrase)?;
|
||||
println!("hash (hex): {}", hex::encode(&hash));
|
||||
println!("hash (base64): {}", dkl::base64_encode(&hash));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_asset_file(
|
||||
path: Option<String>,
|
||||
kind: &str,
|
||||
name: &str,
|
||||
asset: &str,
|
||||
) -> std::io::Result<fs::File> {
|
||||
let path = &path.unwrap_or(format!("{kind}_{name}_{asset}"));
|
||||
eprintln!("writing {kind} {name} asset {asset} to {path}");
|
||||
(fs::File::options().write(true).create(true).truncate(true))
|
||||
.mode(0o600)
|
||||
.open(path)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn copy_stream(
|
||||
mut stream: impl Stream<Item = reqwest::Result<Bytes>> + Unpin,
|
||||
out: &mut (impl AsyncWrite + Unpin),
|
||||
) -> std::io::Result<()> {
|
||||
let mut out = tokio::io::BufWriter::new(out);
|
||||
|
||||
let info_delay = Duration::from_secs(1);
|
||||
let mut ts = SystemTime::now();
|
||||
|
||||
let mut n = 0u64;
|
||||
while let Some(chunk) = stream.next().await {
|
||||
let chunk = chunk.map_err(|e| std::io::Error::other(e))?;
|
||||
n += chunk.len() as u64;
|
||||
out.write_all(&chunk).await?;
|
||||
|
||||
if ts.elapsed().is_ok_and(|t| t >= info_delay) {
|
||||
eprint!("wrote {n} bytes\r");
|
||||
ts = SystemTime::now();
|
||||
}
|
||||
}
|
||||
eprintln!("wrote {n} bytes");
|
||||
|
||||
out.flush().await
|
||||
}
|
||||
|
||||
fn write_json<T: serde::ser::Serialize>(v: &T) {
|
||||
let data = serde_json::to_string_pretty(v).expect("value should serialize to json");
|
||||
println!("{data}");
|
||||
@@ -172,6 +284,20 @@ fn write_raw(raw: &[u8]) {
|
||||
use std::io::Write;
|
||||
|
||||
let mut out = std::io::stdout();
|
||||
out.write(raw).expect("stdout write");
|
||||
out.write_all(raw).expect("stdout write");
|
||||
out.flush().expect("stdout flush");
|
||||
}
|
||||
|
||||
fn parse_download_set_item(s: &str) -> Result<dls::DownloadSetItem, std::io::Error> {
|
||||
let err = |s: &str| std::io::Error::other(s);
|
||||
|
||||
let mut parts = s.split(':');
|
||||
|
||||
let item = dls::DownloadSetItem {
|
||||
kind: parts.next().ok_or(err("no kind"))?.to_string(),
|
||||
name: parts.next().ok_or(err("no name"))?.to_string(),
|
||||
assets: parts.map(|p| p.to_string()).collect(),
|
||||
};
|
||||
|
||||
Ok(item)
|
||||
}
|
||||
|
||||
+112
-45
@@ -2,7 +2,7 @@ use std::collections::BTreeMap as Map;
|
||||
|
||||
pub const TAKE_ALL: i16 = -1;
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
|
||||
pub struct Config {
|
||||
pub anti_phishing_code: String,
|
||||
|
||||
@@ -14,20 +14,22 @@ pub struct Config {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub resolv_conf: Option<String>,
|
||||
|
||||
#[serde(default)]
|
||||
#[serde(default, skip_serializing_if = "Map::is_empty")]
|
||||
pub vpns: Map<String, String>,
|
||||
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub networks: Vec<Network>,
|
||||
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub auths: Vec<Auth>,
|
||||
#[serde(default)]
|
||||
pub ssh: SSHServer,
|
||||
|
||||
#[serde(default)]
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub pre_lvm_crypt: Vec<CryptDev>,
|
||||
#[serde(default)]
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub lvm: Vec<LvmVG>,
|
||||
#[serde(default)]
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub crypt: Vec<CryptDev>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
@@ -36,7 +38,20 @@ pub struct Config {
|
||||
pub bootstrap: Bootstrap,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
impl Config {
|
||||
pub fn new(bootstrap_dev: String) -> Self {
|
||||
Self {
|
||||
anti_phishing_code: "Direktil<3".into(),
|
||||
bootstrap: Bootstrap {
|
||||
dev: bootstrap_dev,
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct Auth {
|
||||
pub name: String,
|
||||
#[serde(alias = "sshKey")]
|
||||
@@ -46,18 +61,32 @@ pub struct Auth {
|
||||
pub password: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct Network {
|
||||
pub name: String,
|
||||
pub interfaces: Vec<NetworkInterface>,
|
||||
pub script: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct NetworkInterface {
|
||||
pub var: String,
|
||||
pub n: i16,
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub regexps: Vec<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub udev: Option<UdevFilter>,
|
||||
}
|
||||
|
||||
impl Default for NetworkInterface {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
var: "iface".into(),
|
||||
n: 1,
|
||||
regexps: Vec::new(),
|
||||
udev: Some(UdevFilter::Eq("INTERFACE".into(), "eth0".into())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
@@ -74,9 +103,9 @@ impl Default for SSHServer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct LvmVG {
|
||||
#[serde(alias = "vg")]
|
||||
#[serde(rename = "vg", alias = "name")]
|
||||
pub name: String,
|
||||
pub pvs: LvmPV,
|
||||
|
||||
@@ -86,7 +115,7 @@ pub struct LvmVG {
|
||||
pub lvs: Vec<LvmLV>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
|
||||
pub struct LvmLVDefaults {
|
||||
#[serde(default)]
|
||||
pub fs: Filesystem,
|
||||
@@ -94,9 +123,10 @@ pub struct LvmLVDefaults {
|
||||
pub raid: Raid,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Clone, Default, Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum Filesystem {
|
||||
#[default]
|
||||
Ext4,
|
||||
Xfs,
|
||||
Btrfs,
|
||||
@@ -115,13 +145,7 @@ impl Filesystem {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Filesystem {
|
||||
fn default() -> Self {
|
||||
Filesystem::Ext4
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct LvmLV {
|
||||
pub name: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
@@ -132,61 +156,104 @@ pub struct LvmLV {
|
||||
pub size: LvSize,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum LvSize {
|
||||
Size(String),
|
||||
Extents(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct LvmPV {
|
||||
pub n: i16,
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub regexps: Vec<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub udev: Option<UdevFilter>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct CryptDev {
|
||||
pub name: String,
|
||||
#[serde(flatten)]
|
||||
pub filter: DevFilter,
|
||||
// hit the limit of enum representation here (flatten + enum variant case)
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub dev: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub prefix: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub udev: Option<UdevFilter>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub optional: Option<bool>,
|
||||
}
|
||||
impl CryptDev {
|
||||
pub fn filter(&self) -> DevFilter<'_> {
|
||||
if let Some(dev) = self.dev.as_deref() {
|
||||
DevFilter::Dev(dev)
|
||||
} else if let Some(prefix) = self.prefix.as_deref() {
|
||||
DevFilter::Prefix(prefix)
|
||||
} else if let Some(udev) = self.udev.as_ref() {
|
||||
DevFilter::Udev(udev)
|
||||
} else {
|
||||
DevFilter::None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn optional(&self) -> bool {
|
||||
self.optional.unwrap_or_else(|| self.filter.is_prefix())
|
||||
self.optional.unwrap_or_else(|| match self.filter() {
|
||||
DevFilter::None => true,
|
||||
DevFilter::Dev(_) => false,
|
||||
DevFilter::Prefix(_) => true,
|
||||
DevFilter::Udev(_) => true,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[test]
|
||||
fn test_parse_crypt_dev() {
|
||||
for s in [
|
||||
"name: sys0\ndev: /dev/sda\n",
|
||||
"name: crypt-\nprefix: /dev/sd\n",
|
||||
"name: crypt-${name}\nudev: !glob [ DEVNAME, /dev/sd* ]\n",
|
||||
] {
|
||||
let dev: CryptDev = serde_yaml::from_str(s).unwrap();
|
||||
dev.filter();
|
||||
dev.optional();
|
||||
}
|
||||
}
|
||||
|
||||
pub enum DevFilter<'t> {
|
||||
None,
|
||||
Dev(&'t str),
|
||||
Prefix(&'t str),
|
||||
Udev(&'t UdevFilter),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum DevFilter {
|
||||
Dev(String),
|
||||
Prefix(String),
|
||||
}
|
||||
impl DevFilter {
|
||||
pub fn is_dev(&self) -> bool {
|
||||
match self {
|
||||
Self::Dev(_) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
pub fn is_prefix(&self) -> bool {
|
||||
match self {
|
||||
Self::Prefix(_) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
pub enum UdevFilter {
|
||||
Has(String),
|
||||
Eq(String, String),
|
||||
Glob(String, String),
|
||||
And(Vec<UdevFilter>),
|
||||
Or(Vec<UdevFilter>),
|
||||
Not(Box<UdevFilter>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
|
||||
pub struct Raid {
|
||||
pub mirrors: Option<u8>,
|
||||
pub stripes: Option<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
|
||||
pub struct Bootstrap {
|
||||
pub dev: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub seed: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub seed_ca: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub seed_servername: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub seed_proxy: Option<String>,
|
||||
}
|
||||
|
||||
+381
@@ -0,0 +1,381 @@
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::Display;
|
||||
use std::path::{Path as StdPath, PathBuf};
|
||||
use std::rc::Rc;
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::{fs, human::Human};
|
||||
|
||||
pub const ROOT: &str = "/sys/fs/cgroup";
|
||||
|
||||
pub async fn ls(
|
||||
parent: Option<impl AsRef<StdPath>>,
|
||||
exclude: &[String],
|
||||
columns: Option<&str>,
|
||||
) -> fs::Result<()> {
|
||||
let mut root = PathBuf::from(ROOT);
|
||||
if let Some(parent) = parent {
|
||||
root = root.join(parent);
|
||||
}
|
||||
|
||||
let cols: [(&str, fn(&Cgroup) -> String); _] = [
|
||||
("wset", |cg| cg.memory.working_set().human()),
|
||||
("anon", |cg| cg.memory.stat.anon.human()),
|
||||
("min", |cg| cg.memory.min.human()),
|
||||
("low", |cg| cg.memory.low.human()),
|
||||
("high", |cg| cg.memory.high.human()),
|
||||
("max", |cg| cg.memory.max.human()),
|
||||
];
|
||||
let cols = if let Some(columns) = columns {
|
||||
(cols.into_iter())
|
||||
.filter(|(n, _)| columns.split(',').any(|col| &col == n))
|
||||
.collect()
|
||||
} else {
|
||||
cols.to_vec()
|
||||
};
|
||||
|
||||
let mut table = tabled::builder::Builder::new();
|
||||
table.push_record(["cgroup"].into_iter().chain(cols.iter().map(|(n, _)| *n)));
|
||||
|
||||
let mut todo = vec![(Cgroup::root(root).await?, vec![], true)];
|
||||
while let Some((cg, p_lasts, last)) = todo.pop() {
|
||||
let mut name = String::new();
|
||||
for last in p_lasts.iter().skip(1) {
|
||||
name.push_str(if *last { " " } else { "| " });
|
||||
}
|
||||
if !p_lasts.is_empty() {
|
||||
name.push_str(if last { "`- " } else { "|- " });
|
||||
}
|
||||
name.push_str(&cg.name());
|
||||
|
||||
table.push_record([name].into_iter().chain(cols.iter().map(|(_, f)| f(&cg))));
|
||||
|
||||
let mut p_lasts = p_lasts.clone();
|
||||
p_lasts.push(last);
|
||||
|
||||
let mut children = cg.read_children().await?;
|
||||
children.sort();
|
||||
todo.extend(
|
||||
(children.into_iter().rev())
|
||||
.filter(|c| !exclude.iter().any(|x| x == &c.path.full_name()))
|
||||
.enumerate()
|
||||
.map(|(i, child)| (child, p_lasts.clone(), i == 0)),
|
||||
);
|
||||
}
|
||||
|
||||
use tabled::settings::{
|
||||
object::{Column, Row},
|
||||
Alignment, Modify,
|
||||
};
|
||||
let mut table = table.build();
|
||||
table.with(tabled::settings::Style::psql());
|
||||
table.with(Alignment::right());
|
||||
table.with(Modify::list(Column::from(0), Alignment::left()));
|
||||
table.with(Modify::list(Row::from(0), Alignment::left()));
|
||||
|
||||
println!("{}", table);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct Cgroup {
|
||||
path: Rc<Path>,
|
||||
children: Vec<PathBuf>,
|
||||
memory: Memory,
|
||||
}
|
||||
|
||||
impl Cgroup {
|
||||
pub async fn root(path: impl AsRef<StdPath>) -> fs::Result<Self> {
|
||||
let path = path.as_ref();
|
||||
Self::read(Path::root(path), path).await
|
||||
}
|
||||
|
||||
async fn read(cg_path: Rc<Path>, path: impl AsRef<StdPath>) -> fs::Result<Self> {
|
||||
let path = path.as_ref();
|
||||
|
||||
use fs::Error as E;
|
||||
|
||||
let mut rd = fs::read_dir(path).await?;
|
||||
|
||||
let mut cg = Self {
|
||||
path: cg_path,
|
||||
children: Vec::new(),
|
||||
memory: Memory::default(),
|
||||
};
|
||||
|
||||
while let Some(entry) = (rd.next_entry().await).map_err(|e| E::ReadDir(path.into(), e))? {
|
||||
let path = entry.path();
|
||||
|
||||
let Some(file_name) = path.file_name() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if (entry.file_type().await)
|
||||
.map_err(|e| E::Stat(path.clone(), e))?
|
||||
.is_dir()
|
||||
{
|
||||
cg.children.push(file_name.into());
|
||||
continue;
|
||||
}
|
||||
|
||||
let file_name = file_name.as_encoded_bytes();
|
||||
let Some(idx) = file_name.iter().position(|b| *b == b'.') else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let (controller, param) = file_name.split_at(idx);
|
||||
let param = ¶m[1..];
|
||||
|
||||
match controller {
|
||||
b"memory" => match param {
|
||||
b"current" => cg.memory.current = read_parse(path).await?,
|
||||
b"low" => cg.memory.low = read_parse(path).await?,
|
||||
b"high" => cg.memory.high = read_parse(path).await?,
|
||||
b"min" => cg.memory.min = read_parse(path).await?,
|
||||
b"max" => cg.memory.max = read_parse(path).await?,
|
||||
b"stat" => cg.memory.stat.read_from(path).await?,
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(cg)
|
||||
}
|
||||
|
||||
async fn read_children(&self) -> fs::Result<Vec<Self>> {
|
||||
let mut r = Vec::with_capacity(self.children.len());
|
||||
|
||||
let mut dir = PathBuf::from(self.path.as_ref());
|
||||
|
||||
for child_name in &self.children {
|
||||
dir.push(child_name);
|
||||
let child_path = Path::Child(self.path.clone(), child_name.into());
|
||||
r.push(Self::read(child_path.into(), &dir).await?);
|
||||
dir.pop();
|
||||
}
|
||||
|
||||
Ok(r)
|
||||
}
|
||||
|
||||
pub fn name(&self) -> Cow<'_, str> {
|
||||
self.path.name()
|
||||
}
|
||||
|
||||
pub fn full_name(&self) -> String {
|
||||
self.path.full_name()
|
||||
}
|
||||
|
||||
pub fn children(&self) -> impl Iterator<Item = &StdPath> {
|
||||
self.children.iter().map(|n| n.as_path())
|
||||
}
|
||||
|
||||
pub async fn read_child(&self, name: impl AsRef<StdPath>) -> fs::Result<Self> {
|
||||
let name = name.as_ref();
|
||||
let mut dir = PathBuf::from(self.path.as_ref());
|
||||
dir.push(name);
|
||||
Self::read(self.path.child(name), &dir).await
|
||||
}
|
||||
|
||||
pub async fn write_param(
|
||||
&self,
|
||||
name: impl AsRef<StdPath>,
|
||||
value: impl AsRef<[u8]>,
|
||||
) -> fs::Result<()> {
|
||||
let cg_dir = PathBuf::from(self.path.as_ref());
|
||||
fs::write(cg_dir.join(name), value).await
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for Cgroup {
|
||||
fn eq(&self, o: &Self) -> bool {
|
||||
&self.path == &o.path
|
||||
}
|
||||
}
|
||||
impl Eq for Cgroup {}
|
||||
|
||||
impl Ord for Cgroup {
|
||||
fn cmp(&self, o: &Self) -> std::cmp::Ordering {
|
||||
self.path.cmp(&o.path)
|
||||
}
|
||||
}
|
||||
impl PartialOrd for Cgroup {
|
||||
fn partial_cmp(&self, o: &Self) -> Option<std::cmp::Ordering> {
|
||||
Some(self.cmp(o))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||
enum Path {
|
||||
Root(PathBuf),
|
||||
Child(Rc<Path>, PathBuf),
|
||||
}
|
||||
|
||||
impl Path {
|
||||
fn name(&self) -> Cow<'_, str> {
|
||||
match self {
|
||||
Self::Root(_) => "/".into(),
|
||||
Self::Child(_, n) => n.to_string_lossy(),
|
||||
}
|
||||
}
|
||||
|
||||
fn full_name(&self) -> String {
|
||||
use Path::*;
|
||||
match self {
|
||||
Root(_) => "/".into(),
|
||||
Child(parent, _) => match parent.as_ref() {
|
||||
Root(_) => self.name().into(),
|
||||
Child(_, _) => format!("{}/{}", parent.full_name(), self.name()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn depth(&self) -> usize {
|
||||
use Path::*;
|
||||
match self {
|
||||
Root(_) => 0,
|
||||
Child(p, _) => 1 + p.depth(),
|
||||
}
|
||||
}
|
||||
|
||||
fn root(dir: impl Into<PathBuf>) -> Rc<Self> {
|
||||
Rc::new(Self::Root(dir.into()))
|
||||
}
|
||||
|
||||
fn child(self: &Rc<Self>, name: impl Into<PathBuf>) -> Rc<Self> {
|
||||
Rc::new(Self::Child(self.clone(), name.into()))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Path> for PathBuf {
|
||||
fn from(mut p: &Path) -> Self {
|
||||
let mut stack = Vec::with_capacity(p.depth() + 1);
|
||||
loop {
|
||||
match p {
|
||||
Path::Root(root_path) => {
|
||||
stack.push(root_path);
|
||||
break;
|
||||
}
|
||||
Path::Child(parent, n) => {
|
||||
stack.push(n);
|
||||
p = parent;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let len = stack.iter().map(|p| p.as_os_str().len() + 1).sum::<usize>() - 1;
|
||||
|
||||
let mut buf = PathBuf::with_capacity(len);
|
||||
buf.extend(stack.into_iter().rev());
|
||||
buf
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_path_to_pathbuf() {
|
||||
let root = Path::root("/a/b");
|
||||
let c1 = root.child("c1");
|
||||
let c1_1 = c1.child("c1-1");
|
||||
|
||||
assert_eq!(PathBuf::from("/a/b/c1"), PathBuf::from(c1.as_ref()));
|
||||
assert_eq!(PathBuf::from("/a/b/c1/c1-1"), PathBuf::from(c1_1.as_ref()));
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct Memory {
|
||||
current: Option<u64>,
|
||||
low: Option<u64>,
|
||||
high: Option<Max>,
|
||||
min: Option<u64>,
|
||||
max: Option<Max>,
|
||||
stat: MemoryStat,
|
||||
}
|
||||
|
||||
impl Memory {
|
||||
/// working set as defined by cAdvisor
|
||||
/// (https://github.com/google/cadvisor/blob/e1ccfa9b4cf2e17d74e0f5526b6487b74b704503/container/libcontainer/handler.go#L853-L862)
|
||||
fn working_set(&self) -> Option<u64> {
|
||||
let cur = self.current?;
|
||||
let inactive = self.stat.inactive_file?;
|
||||
(inactive <= cur).then(|| cur - inactive)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct MemoryStat {
|
||||
anon: Option<u64>,
|
||||
file: Option<u64>,
|
||||
kernel: Option<u64>,
|
||||
kernel_stack: Option<u64>,
|
||||
pagetables: Option<u64>,
|
||||
shmem: Option<u64>,
|
||||
inactive_file: Option<u64>,
|
||||
}
|
||||
|
||||
enum Max {
|
||||
Num(u64),
|
||||
Max,
|
||||
}
|
||||
|
||||
impl Display for Max {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Num(n) => write!(f, "{n}"),
|
||||
Self::Max => f.write_str("max"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Human for Max {
|
||||
fn human(&self) -> String {
|
||||
match self {
|
||||
Self::Num(n) => n.human(),
|
||||
Self::Max => "+∞".into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for Max {
|
||||
type Err = std::num::ParseIntError;
|
||||
fn from_str(s: &str) -> std::result::Result<Self, <Self as FromStr>::Err> {
|
||||
Ok(match s {
|
||||
"max" => Self::Max,
|
||||
s => Self::Num(s.parse()?),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_parse<T: FromStr>(path: impl AsRef<StdPath>) -> fs::Result<Option<T>>
|
||||
where
|
||||
T::Err: Display,
|
||||
{
|
||||
let path = path.as_ref();
|
||||
|
||||
(fs::read_to_string(path).await?)
|
||||
.trim_ascii()
|
||||
.parse()
|
||||
.map_err(|e| fs::Error::Other(path.into(), format!("parse failed: {e}")))
|
||||
.map(|v| Some(v))
|
||||
}
|
||||
|
||||
impl MemoryStat {
|
||||
async fn read_from(&mut self, path: impl AsRef<StdPath>) -> fs::Result<()> {
|
||||
for line in (fs::read_to_string(path).await?).lines() {
|
||||
let Some((key, value)) = line.split_once(' ') else {
|
||||
continue;
|
||||
};
|
||||
let value = value.parse::<u64>().ok();
|
||||
match key {
|
||||
"anon" => self.anon = value,
|
||||
"file" => self.file = value,
|
||||
"kernel" => self.kernel = value,
|
||||
"kernel_stack" => self.kernel_stack = value,
|
||||
"pagetables" => self.pagetables = value,
|
||||
"shmem" => self.shmem = value,
|
||||
"inactive_file" => self.inactive_file = value,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
+95
-12
@@ -6,6 +6,8 @@ use std::collections::BTreeMap as Map;
|
||||
use std::fmt::Display;
|
||||
use std::net::IpAddr;
|
||||
|
||||
pub mod store;
|
||||
|
||||
pub struct Client {
|
||||
base_url: String,
|
||||
token: String,
|
||||
@@ -33,7 +35,7 @@ impl Client {
|
||||
self.get_json("clusters").await
|
||||
}
|
||||
|
||||
pub fn cluster(&self, name: String) -> Cluster {
|
||||
pub fn cluster(&self, name: String) -> Cluster<'_> {
|
||||
Cluster { dls: self, name }
|
||||
}
|
||||
|
||||
@@ -41,16 +43,30 @@ impl Client {
|
||||
self.get_json("hosts").await
|
||||
}
|
||||
|
||||
pub fn host(&self, name: String) -> Host {
|
||||
pub fn host(&self, name: String) -> Host<'_> {
|
||||
Host { dls: self, name }
|
||||
}
|
||||
|
||||
pub async fn get_json<T: serde::de::DeserializeOwned>(&self, path: impl Display) -> Result<T> {
|
||||
let req = self.get(&path)?.header("Accept", "application/json");
|
||||
pub async fn sign_dl_set(&self, req: &DownloadSetReq) -> Result<String> {
|
||||
let req = (self.req(Method::POST, "sign-download-set")?).json(req);
|
||||
self.req_json(req).await
|
||||
}
|
||||
pub async fn fetch_dl_set(
|
||||
&self,
|
||||
signed_dlset: &str,
|
||||
kind: &str,
|
||||
name: &str,
|
||||
asset: &str,
|
||||
) -> Result<impl Stream<Item = reqwest::Result<Bytes>>> {
|
||||
let req = self.get(format!(
|
||||
"public/download-set/{kind}/{name}/{asset}?set={signed_dlset}"
|
||||
))?;
|
||||
let resp = do_req(req, &self.token).await?;
|
||||
Ok(resp.bytes_stream())
|
||||
}
|
||||
|
||||
let body = resp.bytes().await.map_err(Error::Read)?;
|
||||
serde_json::from_slice(&body).map_err(Error::Parse)
|
||||
pub async fn get_json<T: serde::de::DeserializeOwned>(&self, path: impl Display) -> Result<T> {
|
||||
self.req_json(self.get(&path)?).await
|
||||
}
|
||||
pub async fn get_bytes(&self, path: impl Display) -> Result<Vec<u8>> {
|
||||
let resp = do_req(self.get(&path)?, &self.token).await?;
|
||||
@@ -60,6 +76,16 @@ impl Client {
|
||||
self.req(Method::GET, path)
|
||||
}
|
||||
|
||||
pub async fn req_json<T: serde::de::DeserializeOwned>(
|
||||
&self,
|
||||
req: reqwest::RequestBuilder,
|
||||
) -> Result<T> {
|
||||
let req = req.header("Accept", "application/json");
|
||||
let resp = do_req(req, &self.token).await?;
|
||||
|
||||
let body = resp.bytes().await.map_err(Error::Read)?;
|
||||
serde_json::from_slice(&body).map_err(Error::Parse)
|
||||
}
|
||||
pub fn req(&self, method: Method, path: impl Display) -> Result<reqwest::RequestBuilder> {
|
||||
let uri = format!("{}/{path}", self.base_url);
|
||||
|
||||
@@ -135,6 +161,32 @@ impl<'t> Host<'t> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "PascalCase")]
|
||||
pub struct Config {
|
||||
#[serde(default, deserialize_with = "deserialize_null_as_default")]
|
||||
pub clusters: Vec<ClusterConfig>,
|
||||
#[serde(default, deserialize_with = "deserialize_null_as_default")]
|
||||
pub hosts: Vec<HostConfig>,
|
||||
#[serde(default, deserialize_with = "deserialize_null_as_default")]
|
||||
pub host_templates: Vec<HostConfig>,
|
||||
#[serde(default, rename = "SSLConfig")]
|
||||
pub ssl_config: String,
|
||||
#[serde(default, deserialize_with = "deserialize_null_as_default")]
|
||||
pub extra_ca_certs: Map<String, String>,
|
||||
}
|
||||
|
||||
// compensate for go's encoder pitfalls
|
||||
use serde::{Deserialize, Deserializer};
|
||||
fn deserialize_null_as_default<'de, D, T>(deserializer: D) -> std::result::Result<T, D::Error>
|
||||
where
|
||||
T: Default + Deserialize<'de>,
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let opt = Option::deserialize(deserializer)?;
|
||||
Ok(opt.unwrap_or_default())
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "PascalCase")]
|
||||
pub struct ClusterConfig {
|
||||
@@ -143,22 +195,36 @@ pub struct ClusterConfig {
|
||||
pub addons: String,
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Default, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "PascalCase")]
|
||||
pub struct HostConfig {
|
||||
pub name: String,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub cluster_name: Option<String>,
|
||||
|
||||
pub annotations: Map<String, String>,
|
||||
pub bootstrap_config: String,
|
||||
#[serde(rename = "IPXE")]
|
||||
pub ipxe: Option<String>,
|
||||
#[serde(rename = "IPs")]
|
||||
pub ips: Vec<IpAddr>,
|
||||
|
||||
#[serde(default, skip_serializing_if = "Map::is_empty")]
|
||||
pub labels: Map<String, String>,
|
||||
#[serde(default, skip_serializing_if = "Map::is_empty")]
|
||||
pub annotations: Map<String, String>,
|
||||
|
||||
#[serde(rename = "IPXE", skip_serializing_if = "Option::is_none")]
|
||||
pub ipxe: Option<String>,
|
||||
|
||||
pub initrd: String,
|
||||
pub kernel: String,
|
||||
pub labels: Map<String, String>,
|
||||
pub versions: Map<String, String>,
|
||||
|
||||
/// initrd config template
|
||||
pub bootstrap_config: String,
|
||||
/// files to add to the final initrd config, with rendering
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub initrd_files: Vec<crate::File>,
|
||||
|
||||
/// system config template
|
||||
pub config: String,
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize, serde::Serialize)]
|
||||
@@ -184,6 +250,23 @@ pub struct KubeSignReq {
|
||||
pub validity: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "PascalCase")]
|
||||
pub struct DownloadSetReq {
|
||||
pub expiry: String,
|
||||
#[serde(skip_serializing_if = "Vec::is_empty")]
|
||||
pub items: Vec<DownloadSetItem>,
|
||||
}
|
||||
|
||||
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "PascalCase")]
|
||||
pub struct DownloadSetItem {
|
||||
pub kind: String,
|
||||
pub name: String,
|
||||
#[serde(skip_serializing_if = "Vec::is_empty")]
|
||||
pub assets: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
struct ServerError {
|
||||
#[serde(default)]
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
pub fn hash_password(salt: &[u8], passphrase: &str) -> argon2::Result<[u8; 32]> {
|
||||
let hash = argon2::hash_raw(
|
||||
passphrase.as_bytes(),
|
||||
salt,
|
||||
&argon2::Config {
|
||||
variant: argon2::Variant::Argon2id,
|
||||
hash_length: 32,
|
||||
time_cost: 1,
|
||||
mem_cost: 65536,
|
||||
thread_mode: argon2::ThreadMode::Parallel,
|
||||
lanes: 4,
|
||||
..Default::default()
|
||||
},
|
||||
)?;
|
||||
|
||||
unsafe { Ok(hash.try_into().unwrap_unchecked()) }
|
||||
}
|
||||
+188
@@ -0,0 +1,188 @@
|
||||
use eyre::{format_err, Result};
|
||||
use log::{debug, error, info, warn};
|
||||
use std::path::PathBuf;
|
||||
use tokio::{fs, io::AsyncWriteExt, process::Command};
|
||||
|
||||
use crate::fs::spawn_walk_dir;
|
||||
|
||||
pub struct Dynlay<'t> {
|
||||
pub url_prefix: &'t str,
|
||||
pub layers_dir: &'t str,
|
||||
pub chroot: PathBuf,
|
||||
}
|
||||
|
||||
impl<'t> Dynlay<'t> {
|
||||
pub async fn install(&self, layer: &str, version: &str) -> Result<()> {
|
||||
let lay_dir = &format!("{base}/{layer}", base = self.layers_dir);
|
||||
|
||||
debug!("mkdir -p {lay_dir}");
|
||||
fs::create_dir_all(lay_dir).await?;
|
||||
|
||||
let lay_path = &format!("{lay_dir}/{version}");
|
||||
|
||||
if !fs::try_exists(lay_path).await? {
|
||||
let part_file = &format!("{lay_dir}/{version}.tmp");
|
||||
|
||||
self.fetch(layer, version, part_file).await?;
|
||||
|
||||
(fs::rename(part_file, lay_path).await)
|
||||
.map_err(|e| format_err!("failed mv {part_file} {lay_path}: {e}"))?;
|
||||
}
|
||||
|
||||
let mount_path = PathBuf::from(lay_dir).join("mounts").join(layer);
|
||||
|
||||
(fs::create_dir_all(&mount_path).await)
|
||||
.map_err(|e| format_err!("mkdir -p {mount_path:?} failed: {e}"))?;
|
||||
|
||||
let mount_path = &fs::canonicalize(mount_path).await?;
|
||||
let mount_path_str = &mount_path.to_string_lossy().into_owned();
|
||||
|
||||
if is_mount_point(mount_path_str).await? {
|
||||
info!("clearing previous mount");
|
||||
|
||||
let mut paths = spawn_walk_dir(mount_path.clone());
|
||||
while let Some(result) = paths.recv().await {
|
||||
let Ok((path, md)) = result else {
|
||||
continue;
|
||||
};
|
||||
if !md.is_dir() {
|
||||
let path = self.chroot.join(&path);
|
||||
|
||||
debug!("rm {path:?}");
|
||||
if let Err(e) = fs::remove_file(&path).await {
|
||||
warn!("rm {path:?} failed: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sudo("umount", &[mount_path]).await?;
|
||||
}
|
||||
|
||||
// mount layer
|
||||
info!("mounting layer");
|
||||
sudo("mount", &[lay_path, &mount_path_str]).await?;
|
||||
|
||||
let mut paths = spawn_walk_dir(mount_path.clone());
|
||||
while let Some(result) = paths.recv().await {
|
||||
let Ok((path, md)) = result else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let target = self.chroot.join(&path);
|
||||
|
||||
if md.is_dir() {
|
||||
debug!("mkdir -p {target:?}");
|
||||
if let Err(e) = fs::create_dir_all(&target).await {
|
||||
error!("mkdir -p {target:?} failed: {e}");
|
||||
}
|
||||
} else {
|
||||
let _ = fs::remove_file(&target).await;
|
||||
|
||||
let source = mount_path.join(&path);
|
||||
|
||||
debug!("ln -s {source:?} {target:?}");
|
||||
if let Err(e) = fs::symlink(&source, &target).await {
|
||||
error!("ln -s {source:?} {target:?} failed: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn fetch(&self, layer: &str, version: &str, part_file: &str) -> Result<()> {
|
||||
let url = &format!("{}/{layer}/{version}", self.url_prefix);
|
||||
info!("fetching {url}");
|
||||
|
||||
let mut out = (fs::File::create(part_file).await)
|
||||
.map_err(|e| format_err!("failed to open {part_file}: {e}"))?;
|
||||
|
||||
let resp = reqwest::get(url).await?;
|
||||
if !resp.status().is_success() {
|
||||
return Err(format_err!("fetch failed: {}", resp.status()));
|
||||
}
|
||||
|
||||
let sha1 = (resp.headers().get("x-content-sha1"))
|
||||
.ok_or(format_err!("no content hash in response"))?;
|
||||
let sha1 = (sha1.to_str()).map_err(|e| format_err!("invalid sha1: {e}"))?;
|
||||
|
||||
debug!("content sha1: {sha1}");
|
||||
let mut exp_sha1 = [0; 20];
|
||||
hex::decode_to_slice(sha1, &mut exp_sha1).map_err(|e| format_err!("invalid sha1: {e}"))?;
|
||||
|
||||
let mut hash = openssl::sha::Sha1::new();
|
||||
|
||||
use futures::StreamExt;
|
||||
let mut stream = resp.bytes_stream();
|
||||
while let Some(bytes) = stream.next().await {
|
||||
let bytes = bytes.map_err(|e| format_err!("remote read error: {e}"))?;
|
||||
hash.update(&bytes);
|
||||
(out.write_all(&bytes).await).map_err(|e| format_err!("local write error: {e}"))?;
|
||||
}
|
||||
|
||||
(out.flush().await).map_err(|e| format_err!("local write error: {e}"))?;
|
||||
drop(out);
|
||||
|
||||
let dl_sha1 = hash.finish();
|
||||
if dl_sha1 != exp_sha1 {
|
||||
if let Err(e) = fs::remove_file(part_file).await {
|
||||
error!("failed to remove {part_file}: {e}");
|
||||
}
|
||||
return Err(format_err!(
|
||||
"invalid content hash: expected {exp}, got {got}",
|
||||
exp = hex::encode(exp_sha1),
|
||||
got = hex::encode(dl_sha1)
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn sudo<I, S>(program: &str, args: I) -> Result<()>
|
||||
where
|
||||
I: IntoIterator<Item = S>,
|
||||
S: AsRef<std::ffi::OsStr>,
|
||||
{
|
||||
let mut cmd = if nix::unistd::geteuid().is_root() {
|
||||
let mut cmd = Command::new(program);
|
||||
cmd.args(args);
|
||||
cmd
|
||||
} else {
|
||||
let mut cmd = Command::new("sudo");
|
||||
cmd.arg(program).args(args);
|
||||
cmd
|
||||
};
|
||||
let status = cmd.status().await?;
|
||||
if status.success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(format_err!("{program} failed: {status}"))
|
||||
}
|
||||
}
|
||||
|
||||
async fn is_mount_point(target: &str) -> Result<bool> {
|
||||
for line in fs::read_to_string("/proc/self/mounts").await?.lines() {
|
||||
let line = line.trim_ascii();
|
||||
if line.is_empty() || line.starts_with("#") {
|
||||
continue;
|
||||
}
|
||||
|
||||
let split: Vec<_> = line.split_ascii_whitespace().collect();
|
||||
if split.len() < 6 {
|
||||
continue;
|
||||
}
|
||||
|
||||
// let dev = split[0];
|
||||
let mount_point = split[1];
|
||||
// let fstype = split[2];
|
||||
// let mntops = split[3];
|
||||
// let fs_freq = split[4];
|
||||
// let fsck_passno = split[5];
|
||||
|
||||
if mount_point == target {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
@@ -0,0 +1,104 @@
|
||||
use std::fs::Metadata;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::fs;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
pub use tokio::fs::ReadDir;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("{0}: read dir: {1}")]
|
||||
ReadDir(PathBuf, std::io::Error),
|
||||
#[error("{0}: exists: {1}")]
|
||||
Exists(PathBuf, std::io::Error),
|
||||
#[error("{0}: read: {1}")]
|
||||
Read(PathBuf, std::io::Error),
|
||||
#[error("{0}: stat: {1}")]
|
||||
Stat(PathBuf, std::io::Error),
|
||||
#[error("{0}: create dir: {1}")]
|
||||
CreateDir(PathBuf, std::io::Error),
|
||||
#[error("{0}: write: {1}")]
|
||||
Write(PathBuf, std::io::Error),
|
||||
#[error("{0}: remove file: {1}")]
|
||||
RemoveFile(PathBuf, std::io::Error),
|
||||
#[error("{0}: symlink: {1}")]
|
||||
Symlink(PathBuf, std::io::Error),
|
||||
#[error("{0}: {1}")]
|
||||
Other(PathBuf, String),
|
||||
}
|
||||
|
||||
macro_rules! wrap_path {
|
||||
($fn:ident $( ( $( $pname:ident : $ptype:ty ),* ) )? -> $result:ty, $err:ident) => {
|
||||
pub async fn $fn(path: impl AsRef<Path>$($(, $pname: $ptype)*)?) -> Result<$result> {
|
||||
let path = path.as_ref();
|
||||
fs::$fn(path $($(, $pname)*)?).await.map_err(|e| Error::$err(path.into(), e))
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
wrap_path!(read_dir -> ReadDir, ReadDir);
|
||||
wrap_path!(try_exists -> bool, Exists);
|
||||
wrap_path!(read -> Vec<u8>, Read);
|
||||
wrap_path!(read_to_string -> String, Read);
|
||||
wrap_path!(create_dir -> (), CreateDir);
|
||||
wrap_path!(create_dir_all -> (), CreateDir);
|
||||
wrap_path!(remove_file -> (), RemoveFile);
|
||||
wrap_path!(symlink(link_src: impl AsRef<Path>) -> (), Symlink);
|
||||
wrap_path!(write(content: impl AsRef<[u8]>) -> (), Write);
|
||||
|
||||
pub fn spawn_walk_dir(
|
||||
dir: impl Into<PathBuf> + Send + 'static,
|
||||
) -> mpsc::Receiver<Result<(PathBuf, Metadata)>> {
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
tokio::spawn(walk_dir(dir, tx));
|
||||
rx
|
||||
}
|
||||
|
||||
pub async fn walk_dir(dir: impl Into<PathBuf>, tx: mpsc::Sender<Result<(PathBuf, Metadata)>>) {
|
||||
let dir: PathBuf = dir.into();
|
||||
|
||||
let mut todo = std::collections::LinkedList::new();
|
||||
if let Ok(rd) = read_dir(&dir).await {
|
||||
todo.push_front(rd);
|
||||
}
|
||||
|
||||
while let Some(rd) = todo.front_mut() {
|
||||
let entry = match rd.next_entry().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
if tx.send(Err(Error::ReadDir(dir.clone(), e))).await.is_err() {
|
||||
return;
|
||||
}
|
||||
todo.pop_front(); // skip dir on error
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let Some(entry) = entry else {
|
||||
todo.pop_front();
|
||||
continue;
|
||||
};
|
||||
|
||||
let Ok(md) = entry.metadata().await else {
|
||||
continue;
|
||||
};
|
||||
let is_dir = md.is_dir();
|
||||
|
||||
let Ok(path) = entry.path().strip_prefix(&dir).map(|p| p.to_path_buf()) else {
|
||||
continue; // sub-entry not in dir, weird but semantically, we ignore
|
||||
};
|
||||
|
||||
if tx.send(Ok((path, md))).await.is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
// recurse in sub directories
|
||||
if is_dir {
|
||||
if let Ok(rd) = read_dir(entry.path()).await {
|
||||
todo.push_front(rd);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
use human_units::FormatSize;
|
||||
use std::fmt::{Display, Formatter, Result};
|
||||
|
||||
pub trait Human {
|
||||
fn human(&self) -> String;
|
||||
}
|
||||
|
||||
pub struct Quantity(u64);
|
||||
|
||||
impl Display for Quantity {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
|
||||
self.0.format_size().fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl Human for Quantity {
|
||||
fn human(&self) -> String {
|
||||
self.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl Human for u64 {
|
||||
fn human(&self) -> String {
|
||||
self.format_size().to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Human> Human for Option<T> {
|
||||
fn human(&self) -> String {
|
||||
match self {
|
||||
Some(h) => h.human(),
|
||||
None => "◌".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
+97
-13
@@ -1,9 +1,17 @@
|
||||
use std::borrow::Cow;
|
||||
|
||||
pub mod apply;
|
||||
pub mod bootstrap;
|
||||
pub mod cgroup;
|
||||
pub mod dls;
|
||||
pub mod dynlay;
|
||||
pub mod fs;
|
||||
pub mod human;
|
||||
pub mod logger;
|
||||
pub mod proxy;
|
||||
pub mod rc;
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Debug, Default, serde::Deserialize, serde::Serialize)]
|
||||
pub struct Config {
|
||||
pub layers: Vec<String>,
|
||||
pub root_user: RootUser,
|
||||
@@ -17,18 +25,20 @@ pub struct Config {
|
||||
pub users: Vec<User>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Debug, Default, serde::Deserialize, serde::Serialize)]
|
||||
pub struct RootUser {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub password_hash: Option<String>,
|
||||
pub authorized_keys: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Debug, Default, serde::Deserialize, serde::Serialize)]
|
||||
pub struct Mount {
|
||||
pub r#type: Option<String>,
|
||||
pub dev: String,
|
||||
pub path: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub r#type: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub options: Option<String>,
|
||||
}
|
||||
|
||||
@@ -48,19 +58,93 @@ pub struct User {
|
||||
pub gid: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[derive(Default, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
|
||||
pub struct File {
|
||||
pub path: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub mode: Option<u32>,
|
||||
#[serde(flatten)]
|
||||
pub kind: FileKind,
|
||||
pub kind: Option<FileKind>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub content: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub content64: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub symlink: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub parts: Option<Vec<FilePart>>,
|
||||
#[serde(default, skip_serializing_if = "is_false")]
|
||||
pub dir: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum FileKind {
|
||||
Content(String),
|
||||
Symlink(String),
|
||||
Dir(bool),
|
||||
fn is_false(b: &bool) -> bool {
|
||||
!b
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum FileKind {
|
||||
#[default]
|
||||
Skip,
|
||||
Content(String),
|
||||
Content64(String),
|
||||
Parts(Vec<FilePart>),
|
||||
Symlink(String),
|
||||
Dir,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum FilePart {
|
||||
Content(String),
|
||||
Content64(String),
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------
|
||||
|
||||
impl Config {
|
||||
pub fn has_file(&self, path: &str) -> bool {
|
||||
self.files.iter().any(|f| f.path == path)
|
||||
}
|
||||
pub fn file(&self, path: &str) -> Option<&File> {
|
||||
self.files.iter().find(|f| f.path == path)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn base64_decode(s: &str) -> Result<Vec<u8>, base64::DecodeError> {
|
||||
use base64::{prelude::BASE64_STANDARD_NO_PAD as B64, Engine as _};
|
||||
B64.decode(s.trim_end_matches('='))
|
||||
}
|
||||
|
||||
pub fn base64_encode(b: &[u8]) -> String {
|
||||
use base64::{prelude::BASE64_STANDARD as B64, Engine as _};
|
||||
B64.encode(b)
|
||||
}
|
||||
|
||||
impl<'t> File {
|
||||
pub fn kind(&'t self) -> Cow<'t, FileKind> {
|
||||
self.kind.as_ref().map(Cow::Borrowed).unwrap_or_else(|| {
|
||||
use FileKind::*;
|
||||
Cow::Owned(if let Some(ref s) = self.content {
|
||||
Content(s.clone())
|
||||
} else if let Some(ref s) = self.content64 {
|
||||
Content64(s.clone())
|
||||
} else if let Some(ref s) = self.symlink {
|
||||
Symlink(s.clone())
|
||||
} else if let Some(ref p) = self.parts {
|
||||
Parts(p.clone())
|
||||
} else if self.dir {
|
||||
Dir
|
||||
} else {
|
||||
Skip
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
pub fn is_symlink(&self) -> bool {
|
||||
if let Some(ref kind) = self.kind {
|
||||
matches!(kind, FileKind::Symlink(_))
|
||||
} else {
|
||||
self.symlink.is_some()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+236
-93
@@ -2,16 +2,19 @@ use async_compression::tokio::write::{ZstdDecoder, ZstdEncoder};
|
||||
use chrono::{DurationRound, TimeDelta, Utc};
|
||||
use eyre::{format_err, Result};
|
||||
use log::{debug, error, warn};
|
||||
use std::ffi::OsStr;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::Stdio;
|
||||
use tokio::{
|
||||
fs::{self, File},
|
||||
fs::File,
|
||||
io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter},
|
||||
process,
|
||||
process::{Child, Command},
|
||||
sync::mpsc,
|
||||
time::{sleep, Duration},
|
||||
};
|
||||
|
||||
use crate::{cgroup, fs};
|
||||
|
||||
pub type Timestamp = chrono::DateTime<Utc>;
|
||||
|
||||
const TS_FORMAT: &str = "%Y%m%d_%H";
|
||||
@@ -20,45 +23,92 @@ const TRUNC_DELTA: TimeDelta = TimeDelta::hours(1);
|
||||
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
|
||||
const WRITE_RETRY_DELAY: Duration = Duration::from_secs(1);
|
||||
|
||||
pub struct Logger<'t> {
|
||||
pub log_path: &'t str,
|
||||
pub log_name: &'t str,
|
||||
pub struct Logger {
|
||||
pub log_path: PathBuf,
|
||||
pub log_name: PathBuf,
|
||||
pub with_prefix: bool,
|
||||
pub cgroup: Option<String>,
|
||||
}
|
||||
|
||||
impl<'t> Logger<'t> {
|
||||
pub async fn run(&self, command: &str, args: &[String]) -> Result<()> {
|
||||
impl Logger {
|
||||
pub async fn setup<I, S>(&self, command: impl AsRef<OsStr>, args: I) -> fs::Result<Command>
|
||||
where
|
||||
I: IntoIterator<Item = S>,
|
||||
S: AsRef<OsStr>,
|
||||
{
|
||||
// make sure we can at least open the log before starting the command
|
||||
let archives_path = &format!("{path}/archives", path = self.log_path);
|
||||
(fs::create_dir_all(archives_path).await)
|
||||
.map_err(|e| format_err!("failed to create archives dir: {e}"))?;
|
||||
let archives_read_dir = (fs::read_dir(archives_path).await)
|
||||
.map_err(|e| format_err!("failed to list archives: {e}"))?;
|
||||
let archives_path = &self.log_path.join("archives");
|
||||
fs::create_dir_all(archives_path).await?;
|
||||
|
||||
let mut prev_stamp = ts_trunc(Utc::now());
|
||||
let mut current_log = BufWriter::new(self.open_log(prev_stamp).await?);
|
||||
let archives_read_dir = fs::read_dir(archives_path).await?;
|
||||
|
||||
let prev_stamp = trunc_ts(Utc::now());
|
||||
|
||||
tokio::spawn(compress_archives(
|
||||
archives_read_dir,
|
||||
self.log_name.to_string(),
|
||||
self.log_name.clone(),
|
||||
prev_stamp.format(TS_FORMAT).to_string(),
|
||||
));
|
||||
|
||||
// start the command
|
||||
let mut child = process::Command::new(command)
|
||||
.args(args)
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()?;
|
||||
// create the command
|
||||
let mut cmd = Command::new(command);
|
||||
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
cmd.args(args);
|
||||
|
||||
if let Some(cgroup) = self.cgroup.as_deref() {
|
||||
let cg_path = PathBuf::from(cgroup::ROOT)
|
||||
.join(cgroup)
|
||||
.join(&self.log_name);
|
||||
|
||||
fs::create_dir_all(&cg_path).await?;
|
||||
|
||||
let procs_file = cg_path.join("cgroup.procs");
|
||||
debug!("procs file {}", procs_file.display());
|
||||
|
||||
unsafe { cmd.pre_exec(move || std::fs::write(&procs_file, b"0")) };
|
||||
}
|
||||
|
||||
Ok(cmd)
|
||||
}
|
||||
|
||||
pub fn spawn(self, mut cmd: Command) -> std::io::Result<Child> {
|
||||
// setup outputs for capture
|
||||
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
|
||||
|
||||
// spawn
|
||||
let mut child = cmd.spawn()?;
|
||||
|
||||
// capture outputs
|
||||
let (tx, rx) = mpsc::channel(8);
|
||||
|
||||
tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone()));
|
||||
tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx));
|
||||
|
||||
// log outputs
|
||||
tokio::spawn(self.log_stream(rx));
|
||||
|
||||
Ok(child)
|
||||
}
|
||||
|
||||
// TODO: Result<!> when stable
|
||||
pub async fn exec(self, cmd: Command) -> Result<()> {
|
||||
let mut child = self.spawn(cmd)?;
|
||||
|
||||
// forward signals
|
||||
if let Some(child_pid) = child.id() {
|
||||
forward_signals_to(child_pid as i32);
|
||||
}
|
||||
|
||||
let status = child.wait().await?;
|
||||
std::process::exit(status.code().unwrap_or(-1));
|
||||
}
|
||||
|
||||
async fn log_stream(self, mut rx: mpsc::Receiver<LogItem>) {
|
||||
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
|
||||
|
||||
// handle output
|
||||
let mut prev_stamp = trunc_ts(Utc::now());
|
||||
let mut current_log = BufWriter::new(self.eventually_open_log(prev_stamp).await);
|
||||
|
||||
loop {
|
||||
tokio::select!(
|
||||
r = rx.recv() => {
|
||||
@@ -79,13 +129,10 @@ impl<'t> Logger<'t> {
|
||||
}
|
||||
|
||||
// finalize
|
||||
while let Err(e) = current_log.flush().await {
|
||||
error!("final log flush failed: {e}");
|
||||
while let Err(e) = current_log.shutdown().await {
|
||||
error!("final log shutdown failed: {e}");
|
||||
sleep(WRITE_RETRY_DELAY).await;
|
||||
}
|
||||
|
||||
let status = child.wait().await?;
|
||||
std::process::exit(status.code().unwrap_or(-1));
|
||||
}
|
||||
|
||||
async fn log_item(
|
||||
@@ -94,7 +141,7 @@ impl<'t> Logger<'t> {
|
||||
prev_stamp: &mut Timestamp,
|
||||
out: &mut BufWriter<File>,
|
||||
) -> Result<()> {
|
||||
let trunc_ts = ts_trunc(log.ts);
|
||||
let trunc_ts = trunc_ts(log.ts);
|
||||
if *prev_stamp < trunc_ts {
|
||||
// switch log
|
||||
out.flush().await?;
|
||||
@@ -112,9 +159,24 @@ impl<'t> Logger<'t> {
|
||||
}
|
||||
|
||||
out.write_all(&log.line).await?;
|
||||
if log.line.last() != Some(&b'\n') {
|
||||
out.write("↵\n".as_bytes()).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn eventually_open_log(&self, ts: Timestamp) -> File {
|
||||
loop {
|
||||
match self.open_log(ts).await {
|
||||
Ok(log) => break log,
|
||||
Err(e) => {
|
||||
error!("open log failed: {e}");
|
||||
sleep(WRITE_RETRY_DELAY).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn open_log(&self, ts: Timestamp) -> Result<File> {
|
||||
let log_file = &self.archive_path(ts);
|
||||
|
||||
@@ -125,36 +187,62 @@ impl<'t> Logger<'t> {
|
||||
.open(log_file)
|
||||
.await?;
|
||||
|
||||
let link_src = &format!(
|
||||
"{path}/{name}.log",
|
||||
path = self.log_path,
|
||||
name = self.log_name
|
||||
);
|
||||
let link_tgt = log_file
|
||||
.strip_prefix(&format!("{}/", self.log_path))
|
||||
.unwrap_or(log_file);
|
||||
let link_src = &self
|
||||
.log_path
|
||||
.join(&self.log_name)
|
||||
.with_added_extension("log");
|
||||
let link_tgt = &self.archive_rel_path(ts);
|
||||
|
||||
let _ = fs::remove_file(link_src).await;
|
||||
fs::symlink(link_tgt, link_src)
|
||||
.await
|
||||
.map_err(|e| format_err!("symlink {link_src} -> {link_tgt} failed: {e}",))?;
|
||||
fs::symlink(link_tgt, link_src).await.map_err(|e| {
|
||||
format_err!(
|
||||
"symlink {s} -> {t} failed: {e}",
|
||||
s = link_src.display(),
|
||||
t = link_tgt.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
fn archive_path(&self, ts: Timestamp) -> String {
|
||||
format!(
|
||||
"{path}/archives/{file}",
|
||||
path = self.log_path,
|
||||
file = self.archive_file(ts)
|
||||
)
|
||||
fn archive_path(&self, ts: Timestamp) -> PathBuf {
|
||||
self.log_path.join(self.archive_rel_path(ts))
|
||||
}
|
||||
fn archive_file(&self, ts: Timestamp) -> String {
|
||||
format!(
|
||||
"{name}.{ts}.log",
|
||||
name = self.log_name,
|
||||
ts = ts.format(TS_FORMAT),
|
||||
)
|
||||
fn archive_rel_path(&self, ts: Timestamp) -> PathBuf {
|
||||
PathBuf::from("archives").join(self.archive_file(ts))
|
||||
}
|
||||
fn archive_file(&self, ts: Timestamp) -> PathBuf {
|
||||
self.log_name
|
||||
.with_added_extension(ts.format(TS_FORMAT).to_string())
|
||||
.with_added_extension("log")
|
||||
}
|
||||
}
|
||||
|
||||
fn forward_signals_to(pid: i32) {
|
||||
use nix::{
|
||||
sys::signal::{kill, Signal},
|
||||
unistd::Pid,
|
||||
};
|
||||
use signal_hook::{consts::*, low_level::register};
|
||||
|
||||
debug!("forwarding signals to pid {pid}");
|
||||
|
||||
let pid = Pid::from_raw(pid);
|
||||
let signals = [
|
||||
SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGUSR1, SIGUSR2, SIGPIPE, SIGALRM,
|
||||
];
|
||||
|
||||
for sig in signals {
|
||||
let Ok(signal) = Signal::try_from(sig) else {
|
||||
continue;
|
||||
};
|
||||
unsafe {
|
||||
register(sig, move || {
|
||||
debug!("forwarding {signal} to {pid}");
|
||||
let _ = kill(pid, signal);
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,41 +252,80 @@ struct LogItem {
|
||||
line: Vec<u8>,
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
stream_name: &'static str,
|
||||
out: impl AsyncRead + Unpin,
|
||||
tx: mpsc::UnboundedSender<LogItem>,
|
||||
) {
|
||||
let mut out = BufReader::new(out);
|
||||
async fn copy(stream_name: &'static str, out: impl AsyncRead + Unpin, tx: mpsc::Sender<LogItem>) {
|
||||
let buf_size = page_size::get();
|
||||
let mut out = BufReader::with_capacity(buf_size, out);
|
||||
let mut line = Vec::with_capacity(buf_size);
|
||||
|
||||
macro_rules! send_line {
|
||||
() => {
|
||||
let log = LogItem {
|
||||
stream_name,
|
||||
ts: chrono::Utc::now(),
|
||||
line: line.clone(),
|
||||
};
|
||||
if let Err(e) = tx.send(log).await {
|
||||
warn!("send line failed: {e}");
|
||||
return;
|
||||
}
|
||||
line.clear();
|
||||
};
|
||||
}
|
||||
|
||||
loop {
|
||||
let mut line = Vec::with_capacity(buf_size);
|
||||
if let Err(e) = out.read_until(b'\n', &mut line).await {
|
||||
warn!("read {stream_name} failed: {e}");
|
||||
return;
|
||||
}
|
||||
if line.is_empty() {
|
||||
let Ok(buf) = (out.fill_buf())
|
||||
.await
|
||||
.inspect_err(|e| warn!("read {stream_name} failed: {e}"))
|
||||
else {
|
||||
break;
|
||||
};
|
||||
|
||||
if buf.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
let log = LogItem {
|
||||
stream_name,
|
||||
ts: chrono::Utc::now(),
|
||||
line,
|
||||
};
|
||||
if let Err(e) = tx.send(log) {
|
||||
warn!("send line failed: {e}");
|
||||
return;
|
||||
let remaining = buf_size - line.len();
|
||||
|
||||
if let Some(pos) = memchr::memchr(b'\n', buf) {
|
||||
let len = pos + 1;
|
||||
let mut buf = &buf[..len];
|
||||
|
||||
if len > remaining {
|
||||
line.extend_from_slice(&buf[..remaining]);
|
||||
send_line!();
|
||||
buf = &buf[remaining..];
|
||||
}
|
||||
|
||||
line.extend_from_slice(buf);
|
||||
out.consume(len);
|
||||
send_line!();
|
||||
} else if buf.len() >= remaining {
|
||||
line.extend_from_slice(&buf[..remaining]);
|
||||
out.consume(remaining);
|
||||
send_line!();
|
||||
} else {
|
||||
line.extend_from_slice(buf);
|
||||
let len = buf.len();
|
||||
out.consume(len);
|
||||
}
|
||||
}
|
||||
|
||||
if !line.is_empty() {
|
||||
send_line!();
|
||||
}
|
||||
}
|
||||
|
||||
fn ts_trunc(ts: Timestamp) -> Timestamp {
|
||||
pub fn trunc_ts(ts: Timestamp) -> Timestamp {
|
||||
ts.duration_trunc(TRUNC_DELTA)
|
||||
.expect("duration_trunc failed")
|
||||
}
|
||||
|
||||
async fn compress_archives(mut read_dir: fs::ReadDir, log_name: String, exclude_ts: String) {
|
||||
async fn compress_archives(
|
||||
mut read_dir: fs::ReadDir,
|
||||
log_name: impl AsRef<Path>,
|
||||
exclude_ts: String,
|
||||
) {
|
||||
let log_name = log_name.as_ref();
|
||||
loop {
|
||||
let Ok(Some(entry)) =
|
||||
(read_dir.next_entry().await).inspect_err(|e| error!("archive dir read failed: {e}"))
|
||||
@@ -243,21 +370,21 @@ async fn compress(path: impl AsRef<Path>) {
|
||||
.await
|
||||
.map_err(|e| format_err!("open {path_str} failed: {e}"))?;
|
||||
|
||||
let out_path = path.with_extension("zstd");
|
||||
let out_path = path.with_extension("zst");
|
||||
let out = (File::create(&out_path).await) // create output
|
||||
.map_err(|e| format_err!("create {} failed: {e}", out_path.to_string_lossy()))?;
|
||||
|
||||
let mut out = ZstdEncoder::new(out);
|
||||
async {
|
||||
tokio::io::copy(&mut input, &mut out).await?;
|
||||
out.flush().await
|
||||
out.shutdown().await
|
||||
}
|
||||
.await
|
||||
.map_err(|e| format_err!("compression of {path_str} failed: {e}"))?;
|
||||
|
||||
fs::remove_file(path)
|
||||
.await
|
||||
.map_err(|e| format_err!("remove {path_str} failed: {e}"))
|
||||
.map_err(|e| format_err!("remove {path_str} failed: {e}"))
|
||||
}
|
||||
.await;
|
||||
|
||||
@@ -267,32 +394,37 @@ async fn compress(path: impl AsRef<Path>) {
|
||||
}
|
||||
|
||||
pub fn parse_ts(ts: &str) -> std::result::Result<Timestamp, chrono::ParseError> {
|
||||
let dt =
|
||||
chrono::NaiveDateTime::parse_from_str(&format!("{ts}0000"), &format!("{TS_FORMAT}%M%S"))?;
|
||||
let format = &format!("{TS_FORMAT}%M%S");
|
||||
let full_ts = &format!("{ts}0000");
|
||||
let dt = chrono::NaiveDateTime::parse_from_str(full_ts, format)?;
|
||||
Ok(Timestamp::from_naive_utc_and_offset(dt, Utc))
|
||||
}
|
||||
|
||||
pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result<Vec<LogFile>> {
|
||||
pub async fn log_files(log_path: &str, log_name: &str) -> fs::Result<Vec<LogFile>> {
|
||||
let mut dir = PathBuf::from(log_path);
|
||||
dir.push("archives");
|
||||
|
||||
let mut entries = Vec::new();
|
||||
let mut read_dir = fs::read_dir(dir).await?;
|
||||
let mut read_dir = fs::read_dir(&dir).await?;
|
||||
|
||||
while let Some(entry) = read_dir.next_entry().await? {
|
||||
while let Some(entry) =
|
||||
(read_dir.next_entry().await).map_err(|e| fs::Error::ReadDir(dir.clone(), e))?
|
||||
{
|
||||
let file_name = entry.file_name();
|
||||
let Some(file_name) = file_name.to_str() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let (name, compressed) = file_name
|
||||
.strip_suffix(".zstd")
|
||||
.map_or((file_name, false), |s| (s, true));
|
||||
|
||||
let Some(name) = name.strip_suffix(".log") else {
|
||||
let Some((name, ext)) = file_name.rsplit_once('.') else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let compressed = match ext {
|
||||
"zst" => true,
|
||||
"log" => false,
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
let Some((name, timestamp)) = name.rsplit_once('.') else {
|
||||
continue;
|
||||
};
|
||||
@@ -316,27 +448,38 @@ pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result<Vec<Lo
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Ord)]
|
||||
#[derive(Debug)]
|
||||
pub struct LogFile {
|
||||
pub path: PathBuf,
|
||||
pub compressed: bool,
|
||||
pub timestamp: Timestamp,
|
||||
}
|
||||
|
||||
impl Eq for LogFile {}
|
||||
impl PartialEq for LogFile {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.timestamp == other.timestamp
|
||||
}
|
||||
}
|
||||
impl Ord for LogFile {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
self.timestamp.cmp(&other.timestamp)
|
||||
}
|
||||
}
|
||||
impl PartialOrd for LogFile {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
self.timestamp.partial_cmp(&other.timestamp)
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl LogFile {
|
||||
pub async fn copy_to(&self, out: &mut (impl AsyncWrite + Unpin)) -> io::Result<u64> {
|
||||
let mut input = File::open(&self.path).await?;
|
||||
let input = &mut File::open(&self.path).await?;
|
||||
if self.compressed {
|
||||
let mut out = ZstdDecoder::new(out);
|
||||
tokio::io::copy(&mut input, &mut out).await
|
||||
let out = &mut ZstdDecoder::new(out);
|
||||
tokio::io::copy(input, out).await
|
||||
} else {
|
||||
tokio::io::copy(&mut input, out).await
|
||||
tokio::io::copy(input, out).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+136
@@ -0,0 +1,136 @@
|
||||
use log::{info, log_enabled, warn};
|
||||
use std::convert::Infallible;
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::time;
|
||||
|
||||
pub struct Proxy {
|
||||
pub listen_addrs: Vec<SocketAddr>,
|
||||
pub targets: Vec<SocketAddr>,
|
||||
pub poll: Duration,
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("failed to listen on {0}: {1}")]
|
||||
ListenFailed(SocketAddr, std::io::Error),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl Proxy {
|
||||
pub async fn run(self) -> Result<Infallible> {
|
||||
let mut listeners = Vec::with_capacity(self.listen_addrs.len());
|
||||
for addr in self.listen_addrs {
|
||||
listeners.push(
|
||||
TcpListener::bind(&addr)
|
||||
.await
|
||||
.map_err(|e| Error::ListenFailed(addr, e))?,
|
||||
);
|
||||
info!("listening on {addr}");
|
||||
}
|
||||
|
||||
// all targets are initially ok (better land on a down one than just fail)
|
||||
let targets: Vec<_> = (self.targets.into_iter())
|
||||
.map(|addr| TargetStatus {
|
||||
addr,
|
||||
up: AtomicBool::new(true),
|
||||
timeout: self.timeout,
|
||||
})
|
||||
.collect();
|
||||
|
||||
// the proxy runs forever -> using 'static is not a leak
|
||||
let targets = targets.leak();
|
||||
|
||||
for listener in listeners {
|
||||
tokio::spawn(proxy_listener(listener, targets));
|
||||
}
|
||||
|
||||
check_targets(targets, self.poll).await
|
||||
}
|
||||
}
|
||||
|
||||
struct TargetStatus {
|
||||
addr: SocketAddr,
|
||||
up: AtomicBool,
|
||||
timeout: Duration,
|
||||
}
|
||||
impl TargetStatus {
|
||||
fn is_up(&self) -> bool {
|
||||
self.up.load(Relaxed)
|
||||
}
|
||||
|
||||
fn set_up(&self, is_up: bool) {
|
||||
let prev = self.up.swap(is_up, Relaxed);
|
||||
if prev != is_up {
|
||||
if is_up {
|
||||
info!("{} is up", self.addr);
|
||||
} else {
|
||||
warn!("{} is down", self.addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect(&self) -> io::Result<TcpStream> {
|
||||
let r = match time::timeout(self.timeout, TcpStream::connect(self.addr)).await {
|
||||
Ok(r) => r,
|
||||
Err(e) => Err(io::Error::new(io::ErrorKind::TimedOut, e)),
|
||||
};
|
||||
|
||||
self.set_up(r.is_ok());
|
||||
r
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_targets(targets: &'static [TargetStatus], poll: Duration) -> ! {
|
||||
use tokio::time;
|
||||
let mut poll_ticker = time::interval(poll);
|
||||
poll_ticker.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
|
||||
|
||||
loop {
|
||||
poll_ticker.tick().await;
|
||||
|
||||
let mut tasks = tokio::task::JoinSet::new();
|
||||
|
||||
for target in targets {
|
||||
tasks.spawn(target.connect());
|
||||
}
|
||||
|
||||
tasks.join_all().await;
|
||||
|
||||
if log_enabled!(log::Level::Info) {
|
||||
let mut infos = String::new();
|
||||
for ts in targets.iter() {
|
||||
infos.push_str(&format!("{} ", ts.addr));
|
||||
infos.push_str(if ts.is_up() { "up " } else { "down " });
|
||||
}
|
||||
info!("{infos}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn proxy_listener(listener: TcpListener, targets: &'static [TargetStatus]) {
|
||||
let mut rng = fastrand::Rng::new();
|
||||
|
||||
loop {
|
||||
let mut active = Vec::with_capacity(targets.len());
|
||||
let (mut src, _) = listener.accept().await.expect("listener.accept() failed");
|
||||
|
||||
active.extend((targets.iter().enumerate()).filter_map(|(i, ts)| ts.is_up().then_some(i)));
|
||||
rng.shuffle(&mut active);
|
||||
|
||||
tokio::spawn(async move {
|
||||
for i in active {
|
||||
if let Ok(mut dst) = targets[i].connect().await {
|
||||
let _ = tokio::io::copy_bidirectional(&mut src, &mut dst).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,495 @@
|
||||
use eyre::format_err;
|
||||
use log::{error, info, warn};
|
||||
use nix::sys::signal::Signal;
|
||||
use std::collections::{BTreeMap as Map, BTreeSet as Set};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::LazyLock;
|
||||
use tokio::{
|
||||
io::{copy, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
|
||||
net::{UnixListener, UnixStream},
|
||||
sync::{mpsc, watch, RwLock},
|
||||
};
|
||||
|
||||
use crate::{cgroup, fs};
|
||||
|
||||
mod runner;
|
||||
|
||||
use runner::{Child, State};
|
||||
|
||||
const CFG_PATH: &str = "/etc/direktil/rc.yaml";
|
||||
const SOCK_PATH: &str = "/run/dkl-rc/ctl.sock"; // Path::new when stable
|
||||
|
||||
#[derive(Default, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Config {
|
||||
#[serde(default, skip_serializing_if = "Map::is_empty")]
|
||||
pub cgroups: Map<String, CgroupConfig>,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
pub struct CgroupConfig {
|
||||
pub controllers: String,
|
||||
#[serde(default, skip_serializing_if = "Map::is_empty")]
|
||||
pub settings: Map<String, String>,
|
||||
#[serde(default, skip_serializing_if = "Map::is_empty")]
|
||||
pub services: Map<String, Service>,
|
||||
}
|
||||
|
||||
pub type Service = Vec<String>;
|
||||
|
||||
static MANAGER: LazyLock<RwLock<Manager>> = LazyLock::new(|| RwLock::new(Manager::default()));
|
||||
|
||||
type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
enum Error {
|
||||
#[error("invalid command: {0:?}")]
|
||||
InvalidCommand(String),
|
||||
#[error("config read failed: {0}")]
|
||||
ConfigRead(fs::Error),
|
||||
#[error("config parse failed: {0}")]
|
||||
ConfigParse(serde_yaml::Error),
|
||||
#[error("cgroup setup failed: {0}")]
|
||||
CgroupSetup(fs::Error),
|
||||
#[error("invalid key (cgroup/service)")]
|
||||
InvalidKey,
|
||||
#[error("unknown cgroup: {0:?}")]
|
||||
UnknownCgroup(String),
|
||||
#[error("unknown service: {0:?}")]
|
||||
UnknownService(String),
|
||||
#[error("invalid signal: {0:?}")]
|
||||
InvalidSignal(String),
|
||||
#[error("process exited")]
|
||||
ProcessExited,
|
||||
#[error("nothing running under {0:?}")]
|
||||
NotRunning(String),
|
||||
#[error("kill failed: {0:?}")]
|
||||
KillFailed(nix::Error),
|
||||
#[error("service runner is dead")]
|
||||
RunnerDead,
|
||||
}
|
||||
|
||||
pub async fn run() -> eyre::Result<()> {
|
||||
info!("starting");
|
||||
|
||||
tokio::spawn(wait_terminate());
|
||||
|
||||
let _ = reload_config().await;
|
||||
tokio::spawn(wait_reload());
|
||||
|
||||
if let Some(sock_dir) = PathBuf::from(SOCK_PATH).parent() {
|
||||
let _ = tokio::fs::DirBuilder::new()
|
||||
.mode(0o700)
|
||||
.create(sock_dir)
|
||||
.await;
|
||||
}
|
||||
|
||||
let _ = tokio::fs::remove_file(SOCK_PATH).await;
|
||||
let listener = UnixListener::bind(SOCK_PATH)?;
|
||||
|
||||
loop {
|
||||
let Ok((conn, _)) = listener.accept().await else {
|
||||
warn!("listener closed");
|
||||
break;
|
||||
};
|
||||
|
||||
tokio::spawn(async move { handle(conn).await });
|
||||
}
|
||||
|
||||
cleanup().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn cleanup() {
|
||||
let _ = tokio::fs::remove_file(SOCK_PATH).await;
|
||||
}
|
||||
|
||||
pub async fn ctl<I, S>(args: I) -> eyre::Result<()>
|
||||
where
|
||||
I: IntoIterator<Item = S>,
|
||||
S: Into<String>,
|
||||
{
|
||||
let args: Vec<_> = args.into_iter().map(|s| s.into()).collect();
|
||||
let args = format!("{}\n", args.join(" "));
|
||||
|
||||
match ctl_exec(args.as_bytes()).await {
|
||||
Ok(mut rd) => {
|
||||
copy(&mut rd, &mut tokio::io::stdout()).await?;
|
||||
std::process::exit(0);
|
||||
}
|
||||
Err(e) => {
|
||||
eprint!("{e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn ctl_exec(request: &[u8]) -> eyre::Result<BufReader<UnixStream>> {
|
||||
let mut conn = UnixStream::connect(SOCK_PATH)
|
||||
.await
|
||||
.map_err(|e| format_err!("{SOCK_PATH}: {e}"))?;
|
||||
|
||||
conn.write_all(request).await?;
|
||||
|
||||
let mut rd = BufReader::with_capacity(64, conn);
|
||||
|
||||
let mut code = String::new();
|
||||
rd.read_line(&mut code).await?;
|
||||
let code: i32 = code.trim_ascii().parse()?;
|
||||
|
||||
if code != 0 {
|
||||
let mut err = String::new();
|
||||
rd.read_to_string(&mut err).await?;
|
||||
return Err(format_err!("{}", err.trim_ascii_end()));
|
||||
}
|
||||
|
||||
Ok(rd)
|
||||
}
|
||||
|
||||
async fn handle(mut conn: UnixStream) {
|
||||
let (rd, mut wr) = conn.split();
|
||||
let mut rd = BufReader::with_capacity(64, rd).lines();
|
||||
|
||||
let Ok(Some(line)) = rd.next_line().await else {
|
||||
return;
|
||||
};
|
||||
|
||||
let mut line = line.split_ascii_whitespace();
|
||||
|
||||
macro_rules! next {
|
||||
() => {{
|
||||
match line.next() {
|
||||
Some(v) => v,
|
||||
None => return,
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
let r = match next!() {
|
||||
"ls" => Ok(Some(ls().await)),
|
||||
"status" => Ok(Some(status().await)),
|
||||
"reload-config" => reload_config().await.map(|_| None),
|
||||
"start" => start(next!()).await.map(|_| None),
|
||||
"stop" => stop(next!()).await.map(|_| None),
|
||||
"reload" => reload(next!()).await.map(|_| None),
|
||||
"sig" => sig(next!(), next!()).await.map(|_| None),
|
||||
cmd => Err(Error::InvalidCommand(cmd.into())),
|
||||
};
|
||||
|
||||
let _ = match r {
|
||||
Ok(None) => wr.write_all(b"0\n").await,
|
||||
Ok(Some(s)) => wr.write_all(format!("0\n{s}\n").as_bytes()).await,
|
||||
Err(e) => wr.write_all(format!("1\n{e}\n").as_bytes()).await,
|
||||
};
|
||||
|
||||
let _ = wr.shutdown().await;
|
||||
}
|
||||
|
||||
async fn wait_terminate() {
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
let Ok(mut sig) = signal(SignalKind::terminate())
|
||||
.inspect_err(|e| error!("failed to listen to SIGTERM (will be ignored): {e}"))
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
sig.recv().await;
|
||||
|
||||
info!("SIGTERM received, terminating");
|
||||
MANAGER.write().await.terminate().await;
|
||||
|
||||
cleanup().await;
|
||||
log::logger().flush();
|
||||
std::process::exit(0);
|
||||
}
|
||||
|
||||
async fn wait_reload() {
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
let Ok(mut sig) = signal(SignalKind::hangup())
|
||||
.inspect_err(|e| error!("failed to listen to SIGHUP (will be ignored): {e}"))
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
loop {
|
||||
sig.recv().await;
|
||||
let _ = reload_config().await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn reload_config() -> Result<()> {
|
||||
let cfg = (fs::read(CFG_PATH).await)
|
||||
.map_err(Error::ConfigRead)
|
||||
.inspect_err(|e| error!("{e}"))?;
|
||||
|
||||
let cfg = serde_yaml::from_slice::<Config>(&cfg)
|
||||
.map_err(Error::ConfigParse)
|
||||
.inspect_err(|e| error!("{CFG_PATH}: {e}"))?;
|
||||
|
||||
info!("applying new config");
|
||||
let r = MANAGER.write().await.apply_config(cfg).await;
|
||||
match &r {
|
||||
Ok(_) => info!("applied new config"),
|
||||
Err(e) => info!("failed to apply new config: {e}"),
|
||||
}
|
||||
r
|
||||
}
|
||||
|
||||
async fn ls() -> String {
|
||||
let mut keys = String::new();
|
||||
for (i, k) in MANAGER.read().await.runners.keys().enumerate() {
|
||||
if i != 0 {
|
||||
keys.push('\n');
|
||||
}
|
||||
keys.push_str(k);
|
||||
}
|
||||
keys
|
||||
}
|
||||
|
||||
async fn status() -> String {
|
||||
let status = MANAGER.read().await.status();
|
||||
|
||||
let mut table = tabled::builder::Builder::new();
|
||||
table.push_record(["cgroup", "service", "PID", "state", "msg"]);
|
||||
|
||||
for (cg_svc, child) in status {
|
||||
let (cg, svc) = cg_svc.split_once('/').unwrap();
|
||||
let pid = child.pid.map(|p| p.to_string());
|
||||
table.push_record([
|
||||
cg,
|
||||
svc,
|
||||
pid.as_deref().unwrap_or("◌"),
|
||||
&format!("{:?}", child.state),
|
||||
child.msg.as_deref().unwrap_or("◌"),
|
||||
]);
|
||||
}
|
||||
|
||||
(table.build())
|
||||
.with(tabled::settings::Style::psql())
|
||||
.to_string()
|
||||
}
|
||||
|
||||
async fn start(key: &str) -> Result<()> {
|
||||
MANAGER.write().await.start(key).await
|
||||
}
|
||||
|
||||
async fn stop(key: &str) -> Result<()> {
|
||||
MANAGER.write().await.stop(key).await
|
||||
}
|
||||
|
||||
async fn reload(key: &str) -> Result<()> {
|
||||
MANAGER.read().await.reload(key).await
|
||||
}
|
||||
|
||||
async fn sig(key: &str, sig: &str) -> Result<()> {
|
||||
let sig: Signal = sig.parse().map_err(|_| Error::InvalidSignal(sig.into()))?;
|
||||
signal(key, sig).await
|
||||
}
|
||||
|
||||
async fn child_for(key: &str) -> Result<Child> {
|
||||
MANAGER.read().await.child_for(key)
|
||||
}
|
||||
|
||||
async fn signal(key: &str, sig: Signal) -> Result<()> {
|
||||
child_for(key).await?.kill(sig)
|
||||
}
|
||||
|
||||
fn child_key(cg: &str, svc: &str) -> String {
|
||||
[cg, svc].join("/")
|
||||
}
|
||||
fn split_key(key: &str) -> Result<(&str, &str)> {
|
||||
key.split_once('/').ok_or(Error::InvalidKey)
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct Manager {
|
||||
cfg: Config,
|
||||
procs: Map<String, watch::Receiver<Child>>,
|
||||
runners: Map<String, mpsc::Sender<runner::Cmd>>,
|
||||
}
|
||||
|
||||
impl Manager {
|
||||
fn status(&self) -> Vec<(String, Child)> {
|
||||
(self.procs.iter())
|
||||
.map(|(n, c)| (n.clone(), c.borrow().clone()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn child_for(&self, key: &str) -> Result<Child> {
|
||||
(self.procs.get(key))
|
||||
.map(|c| c.borrow().clone())
|
||||
.ok_or_else(|| Error::NotRunning(key.into()))
|
||||
}
|
||||
|
||||
async fn apply_config(&mut self, new_cfg: Config) -> Result<()> {
|
||||
// create and configure cgroups
|
||||
for (name, cg) in &new_cfg.cgroups {
|
||||
let cg_path = PathBuf::from(cgroup::ROOT).join(name);
|
||||
fs::create_dir_all(&cg_path)
|
||||
.await
|
||||
.map_err(Error::CgroupSetup)?;
|
||||
|
||||
fs::write(
|
||||
cg_path.join("cgroup.subtree_control"),
|
||||
cg.controllers.as_bytes(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::CgroupSetup)?;
|
||||
|
||||
for (setting, value) in &cg.settings {
|
||||
fs::write(cg_path.join(setting), value.as_bytes())
|
||||
.await
|
||||
.map_err(Error::CgroupSetup)?;
|
||||
}
|
||||
}
|
||||
|
||||
let new_svcs: Set<_> = new_cfg.service_keys().collect();
|
||||
|
||||
// stop removed services
|
||||
let to_stop = Map::from_iter(self.runners.extract_if(.., |k, _| !new_svcs.contains(k)));
|
||||
let mut stopped = Set::new();
|
||||
for (key, runner_cmd) in to_stop {
|
||||
if runner_cmd.send(runner::Cmd::Stop).await.is_err() {
|
||||
// runner already dead
|
||||
continue;
|
||||
}
|
||||
stopped.insert(key);
|
||||
}
|
||||
|
||||
// start added services
|
||||
for (key, cg, svc, service) in new_cfg.services() {
|
||||
if self.runners.contains_key(&key) {
|
||||
continue;
|
||||
};
|
||||
|
||||
let cmd = self.spawn_runner(key, cg, svc, service.clone());
|
||||
if let Err(e) = cmd.send(runner::Cmd::Start).await {
|
||||
error!("runner instantly died: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
// wait & cleanup stopped
|
||||
for key in stopped {
|
||||
let Some(mut child_rx) = self.procs.remove(&key) else {
|
||||
continue;
|
||||
};
|
||||
let _ = child_rx
|
||||
.wait_for(|c| matches!(c.state, State::Finalized))
|
||||
.await;
|
||||
}
|
||||
|
||||
self.cfg = new_cfg;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn terminate(&mut self) {
|
||||
self.runners.clear();
|
||||
|
||||
for child in self.procs.values_mut() {
|
||||
let _ = child
|
||||
.wait_for(|c| matches!(c.state, State::Finalized))
|
||||
.await;
|
||||
}
|
||||
|
||||
self.procs.clear();
|
||||
}
|
||||
|
||||
fn runner(&mut self, key: &str) -> Result<mpsc::Sender<runner::Cmd>> {
|
||||
if let Some(c) = self.runners.get(key) {
|
||||
return Ok(c.clone());
|
||||
}
|
||||
|
||||
let (cg, svc) = split_key(key)?;
|
||||
let service = self.cfg.service(key)?;
|
||||
|
||||
Ok(self.spawn_runner(key.into(), cg, svc, service.clone()))
|
||||
}
|
||||
|
||||
fn spawn_runner(
|
||||
&mut self,
|
||||
key: String,
|
||||
cg: &str,
|
||||
svc: &str,
|
||||
service: Service,
|
||||
) -> mpsc::Sender<runner::Cmd> {
|
||||
let (runner, child_rx, cmds_tx) = runner::new(cg, svc, service);
|
||||
|
||||
tokio::spawn(runner.run());
|
||||
|
||||
self.procs.insert(key.clone(), child_rx);
|
||||
self.runners.insert(key, cmds_tx.clone());
|
||||
|
||||
cmds_tx
|
||||
}
|
||||
|
||||
async fn cmd(&mut self, key: &str, cmd: runner::Cmd) -> Result<()> {
|
||||
if self.runner(key)?.send(cmd).await.is_err() {
|
||||
// runner died
|
||||
self.runners.remove(key);
|
||||
return Err(Error::RunnerDead);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start(&mut self, key: &str) -> Result<()> {
|
||||
self.cmd(key, runner::Cmd::Start).await
|
||||
}
|
||||
|
||||
async fn stop(&mut self, key: &str) -> Result<()> {
|
||||
self.cmd(key, runner::Cmd::Stop).await
|
||||
}
|
||||
|
||||
async fn reload(&self, key: &str) -> Result<()> {
|
||||
let proc = (self.procs.get(key)) //
|
||||
.ok_or_else(|| Error::UnknownService(key.into()))?;
|
||||
proc.borrow().reload()
|
||||
}
|
||||
}
|
||||
|
||||
impl Config {
|
||||
fn cgroup(&self, cg: &str) -> Result<&CgroupConfig> {
|
||||
self.cgroups
|
||||
.get(cg)
|
||||
.ok_or_else(|| Error::UnknownCgroup(cg.into()))
|
||||
}
|
||||
|
||||
fn service(&self, key: &str) -> Result<&Service> {
|
||||
let (cg, svc) = split_key(key)?;
|
||||
self.cgroup(cg)?.service(svc)
|
||||
}
|
||||
|
||||
fn service_keys(&self) -> impl Iterator<Item = String> {
|
||||
(self.cgroups.iter())
|
||||
.map(|(cg_name, cg)| cg.services.keys().map(move |n| child_key(cg_name, n)))
|
||||
.flatten()
|
||||
}
|
||||
|
||||
fn services(&self) -> impl Iterator<Item = (String, &String, &String, &Service)> {
|
||||
(self.cgroups.iter())
|
||||
.map(|(cg_name, cg)| {
|
||||
cg.services
|
||||
.iter()
|
||||
.map(move |(n, service)| (child_key(cg_name, n), cg_name, n, service))
|
||||
})
|
||||
.flatten()
|
||||
}
|
||||
}
|
||||
|
||||
impl CgroupConfig {
|
||||
fn service(&self, svc: &str) -> Result<&Vec<String>> {
|
||||
self.services
|
||||
.get(svc)
|
||||
.ok_or_else(|| Error::UnknownService(svc.into()))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn complete() -> Vec<String> {
|
||||
let mut r = vec![];
|
||||
let Ok(rd) = ctl_exec(b"ls\n").await else {
|
||||
return r;
|
||||
};
|
||||
let mut rd = rd.lines();
|
||||
while let Some(line) = rd.next_line().await.ok().flatten() {
|
||||
r.push(line);
|
||||
}
|
||||
r
|
||||
}
|
||||
@@ -0,0 +1,263 @@
|
||||
use log::{error, warn};
|
||||
use nix::{
|
||||
sys::signal::{kill, Signal},
|
||||
unistd::Pid,
|
||||
};
|
||||
use std::num::NonZero;
|
||||
use tokio::{
|
||||
process, select,
|
||||
sync::{mpsc, watch},
|
||||
time::{sleep, sleep_until, Duration, Instant},
|
||||
};
|
||||
|
||||
use super::{Error, Result, Service};
|
||||
use crate::logger::Logger;
|
||||
|
||||
const LOG_PATH: &str = "/var/log";
|
||||
|
||||
const TERM_DELAY: Duration = Duration::from_secs(30);
|
||||
const KILL_DELAY: Duration = Duration::from_secs(10);
|
||||
const RESTART_DELAY: Duration = Duration::from_secs(8);
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum Cmd {
|
||||
Start,
|
||||
Stop,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Copy, Debug)]
|
||||
pub enum State {
|
||||
#[default]
|
||||
NeverStarted,
|
||||
Starting,
|
||||
Running,
|
||||
Crashed,
|
||||
Stopping,
|
||||
Stopped,
|
||||
Finalized,
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
cg: impl Into<String>,
|
||||
svc: impl Into<String>,
|
||||
service: Service,
|
||||
) -> (Runner, watch::Receiver<Child>, mpsc::Sender<Cmd>) {
|
||||
let (manager, child_rx) = ProcessManager::new(service);
|
||||
let (cmds_tx, cmds_rx) = mpsc::channel(1);
|
||||
let r = Runner {
|
||||
cg: cg.into(),
|
||||
svc: svc.into(),
|
||||
cmds_rx,
|
||||
manager,
|
||||
};
|
||||
(r, child_rx, cmds_tx)
|
||||
}
|
||||
|
||||
pub struct Runner {
|
||||
cg: String,
|
||||
svc: String,
|
||||
cmds_rx: mpsc::Receiver<Cmd>,
|
||||
manager: ProcessManager,
|
||||
}
|
||||
|
||||
impl Runner {
|
||||
pub async fn run(mut self) {
|
||||
self.manager.update(State::NeverStarted);
|
||||
|
||||
loop {
|
||||
let cmd = select! {
|
||||
cmd = self.manager.manage() => {
|
||||
cmd
|
||||
}
|
||||
cmd = self.cmds_rx.recv() => {
|
||||
let Some(cmd) = cmd else {
|
||||
break; // command side dropped
|
||||
};
|
||||
Some(cmd)
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(cmd) = cmd {
|
||||
self.process_cmd(cmd).await;
|
||||
}
|
||||
}
|
||||
|
||||
self.process_cmd(Cmd::Stop).await;
|
||||
self.manager.update(State::Finalized);
|
||||
}
|
||||
|
||||
async fn process_cmd(&mut self, cmd: Cmd) {
|
||||
let cg = &self.cg;
|
||||
let svc = &self.svc;
|
||||
|
||||
match cmd {
|
||||
Cmd::Start => {
|
||||
self.manager.start(cg, svc).await;
|
||||
}
|
||||
Cmd::Stop => {
|
||||
self.manager.stop().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ProcessManager {
|
||||
service: Service,
|
||||
child_tx: watch::Sender<Child>,
|
||||
process: Option<process::Child>,
|
||||
restart_deadline: Option<Instant>,
|
||||
}
|
||||
|
||||
impl ProcessManager {
|
||||
fn new(service: Service) -> (Self, watch::Receiver<Child>) {
|
||||
let (child_tx, child_rx) = watch::channel(Child::default());
|
||||
let pm = Self {
|
||||
service,
|
||||
child_tx,
|
||||
process: None,
|
||||
restart_deadline: None,
|
||||
};
|
||||
(pm, child_rx)
|
||||
}
|
||||
|
||||
/// runs a management iteration (ie: waiting for the child or a restart deadline).
|
||||
async fn manage(&mut self) -> Option<Cmd> {
|
||||
if let Some(process) = self.process.as_mut() {
|
||||
let msg = match process.wait().await {
|
||||
Ok(status) => status.to_string(),
|
||||
Err(e) => e.to_string(),
|
||||
};
|
||||
self.crashed(msg);
|
||||
self.process = None;
|
||||
self.restart_deadline = Some(Instant::now() + RESTART_DELAY);
|
||||
None
|
||||
} else if let Some(deadline) = self.restart_deadline {
|
||||
sleep_until(deadline).await;
|
||||
Some(Cmd::Start)
|
||||
} else {
|
||||
std::future::pending().await
|
||||
}
|
||||
}
|
||||
|
||||
async fn start(&mut self, cg: &str, svc: &str) {
|
||||
if self.process.is_some() {
|
||||
return;
|
||||
}
|
||||
|
||||
self.update(State::Starting);
|
||||
|
||||
let logger = Logger {
|
||||
log_path: LOG_PATH.into(),
|
||||
log_name: svc.into(),
|
||||
with_prefix: false,
|
||||
cgroup: Some(cg.into()),
|
||||
};
|
||||
|
||||
let mut args = self.service.iter();
|
||||
let Some(cmd) = args.next() else {
|
||||
error!("{cg}/{svc}: empty command");
|
||||
return;
|
||||
};
|
||||
|
||||
let Ok(cmd) = (logger.setup(cmd, args).await)
|
||||
.inspect_err(|e| self.crashed(format!("setup failed: {e}")))
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
let Ok(child) = logger
|
||||
.spawn(cmd)
|
||||
.inspect_err(|e| self.crashed(format!("exec failed: {e}")))
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
self.process = Some(child);
|
||||
self.restart_deadline = None;
|
||||
self.update(State::Running);
|
||||
}
|
||||
|
||||
async fn stop(&mut self) {
|
||||
self.restart_deadline = None;
|
||||
|
||||
let Some(mut process) = self.process.take() else {
|
||||
return;
|
||||
};
|
||||
let Some(pid) = process.id() else {
|
||||
let _ = process.wait().await; // already dead, reap it
|
||||
self.update(State::Stopped);
|
||||
return;
|
||||
};
|
||||
|
||||
let pid = pid as i32;
|
||||
self.update_full(pid, State::Stopping, None);
|
||||
let pid = Pid::from_raw(pid);
|
||||
|
||||
let _ = kill(pid, Signal::SIGTERM).inspect_err(|e| error!("kill -TERM {pid} failed: {e}"));
|
||||
|
||||
select! {
|
||||
_ = process.wait() => {
|
||||
self.update(State::Stopped);
|
||||
return
|
||||
},
|
||||
_ = sleep(TERM_DELAY) => {
|
||||
warn!("process {pid} did not exit during the grace period, killing");
|
||||
let _ = process.kill().await.inspect_err(|e| error!("kill -KILL {pid} failed: {e}"));
|
||||
}
|
||||
}
|
||||
|
||||
select! {
|
||||
_ = process.wait() => {
|
||||
self.update(State::Stopped);
|
||||
return
|
||||
},
|
||||
_ = sleep(KILL_DELAY) => {
|
||||
error!("process {pid} still alive after SIGKILL");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_pid(&self) -> i32 {
|
||||
(self.process.as_ref())
|
||||
.and_then(|c| Some(c.id()? as i32))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
fn update(&self, state: State) {
|
||||
let pid = self.process_pid();
|
||||
self.update_full(pid, state, None);
|
||||
}
|
||||
|
||||
fn update_full(&self, pid: i32, state: State, msg: Option<String>) {
|
||||
self.child_tx.send_replace(Child {
|
||||
pid: NonZero::new(pid),
|
||||
state,
|
||||
msg,
|
||||
});
|
||||
}
|
||||
|
||||
fn crashed(&self, msg: String) {
|
||||
let pid = self.process_pid();
|
||||
self.update_full(pid, State::Crashed, Some(msg));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Child {
|
||||
pub pid: Option<NonZero<i32>>,
|
||||
pub state: State,
|
||||
pub msg: Option<String>,
|
||||
}
|
||||
|
||||
impl Child {
|
||||
pub fn reload(&self) -> Result<()> {
|
||||
self.kill(Signal::SIGHUP)
|
||||
}
|
||||
|
||||
pub fn kill(&self, sig: Signal) -> Result<()> {
|
||||
let Some(pid) = self.pid else {
|
||||
return Err(Error::ProcessExited);
|
||||
};
|
||||
kill(Pid::from_raw(pid.into()), sig).map_err(|e| Error::KillFailed(e))
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,7 @@ set -ex
|
||||
|
||||
dkl=target/debug/dkl
|
||||
|
||||
test=${1:-log}
|
||||
test=${1:-dynlay}
|
||||
|
||||
export RUST_LOG=debug
|
||||
|
||||
@@ -19,8 +19,20 @@ case $test in
|
||||
cat tmp/log/bash.log
|
||||
;;
|
||||
|
||||
log)
|
||||
$dkl log --log-path tmp/log bash 20250720_12 20250720_16
|
||||
log-ls)
|
||||
$dkl log --log-path tmp/log bash ls -l
|
||||
;;
|
||||
log-cat)
|
||||
$dkl log --log-path tmp/log bash cat 20250720_12 20301231_23
|
||||
;;
|
||||
log-clean)
|
||||
$dkl log --log-path tmp/log bash ls -l
|
||||
$dkl log --log-path tmp/log bash cleanup 0
|
||||
;;
|
||||
|
||||
dynlay)
|
||||
mkdir -p tmp/system
|
||||
$dkl dynlay --layers-dir tmp/dynlay --chroot tmp/system kubernetes v1.33.3_containerd.2.0.5
|
||||
;;
|
||||
|
||||
*) echo 1>&2 "unknown test: $test"; exit 1 ;;
|
||||
|
||||
@@ -18,3 +18,12 @@ $dls cluster cluster ssh-sign ~/.ssh/id_ed25519.pub
|
||||
$dls host m1 | jq '{Name, ClusterName, IPs}'
|
||||
$dls host m1 bootstrap-config
|
||||
|
||||
export DLS_DLSET=$($dls dl-set sign --expiry 1d \
|
||||
cluster:cluster:addons \
|
||||
host:m1:kernel:initrd:bootstrap.tar \
|
||||
host:m2:config:bootstrap-config:boot.vmdk)
|
||||
|
||||
$dls dl-set show
|
||||
$dls dl-set fetch host m2 bootstrap-config
|
||||
rm host_m2_bootstrap-config
|
||||
|
||||
|
||||
Reference in New Issue
Block a user