Compare commits
26 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6059d81b3d | ||
|
|
f3b3a9b9c7 | ||
|
|
a5026b884d | ||
|
|
c19798f9f0 | ||
|
|
dc936f52ab | ||
|
|
33fcfbd197 | ||
|
|
0f116e21b9 | ||
|
|
aa7f15516c | ||
|
|
4619899e65 | ||
|
|
4b1edb2a55 | ||
|
|
d449fc8dcf | ||
|
|
ddc82199fb | ||
|
|
61d31bc22c | ||
|
|
d2293df011 | ||
|
|
723cecff1b | ||
|
|
e8c9ee9885 | ||
|
|
6a6536bdfb | ||
|
|
a6dc420275 | ||
|
|
d9fa31ec33 | ||
|
|
93e5570293 | ||
|
|
fb3f8942d4 | ||
|
|
7acc9e9a3e | ||
|
|
ac90b35142 | ||
|
|
298366a0aa | ||
|
|
ecbbb82c7a | ||
|
|
ebd2f21d42 |
1022
Cargo.lock
generated
1022
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
14
Cargo.toml
14
Cargo.toml
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "dkl"
|
name = "dkl"
|
||||||
version = "1.0.0"
|
version = "1.1.0"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
@@ -13,25 +13,33 @@ codegen-units = 1
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
async-compression = { version = "0.4.27", features = ["tokio", "zstd"] }
|
async-compression = { version = "0.4.27", features = ["tokio", "zstd"] }
|
||||||
base32 = "0.5.1"
|
base32 = "0.5.1"
|
||||||
|
base64 = "0.22.1"
|
||||||
bytes = "1.10.1"
|
bytes = "1.10.1"
|
||||||
chrono = { version = "0.4.41", default-features = false, features = ["clock", "now"] }
|
chrono = { version = "0.4.41", default-features = false, features = ["clock", "now"] }
|
||||||
clap = { version = "4.5.40", features = ["derive", "env"] }
|
clap = { version = "4.5.40", features = ["derive", "env"] }
|
||||||
clap_complete = { version = "4.5.54", features = ["unstable-dynamic"] }
|
clap_complete = { version = "4.5.54", features = ["unstable-dynamic"] }
|
||||||
env_logger = "0.11.8"
|
env_logger = "0.11.8"
|
||||||
eyre = "0.6.12"
|
eyre = "0.6.12"
|
||||||
|
fastrand = "2.3.0"
|
||||||
futures = "0.3.31"
|
futures = "0.3.31"
|
||||||
futures-util = "0.3.31"
|
futures-util = "0.3.31"
|
||||||
glob = "0.3.2"
|
glob = "0.3.2"
|
||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
|
human-units = "0.5.3"
|
||||||
log = "0.4.27"
|
log = "0.4.27"
|
||||||
lz4 = "1.28.1"
|
lz4 = "1.28.1"
|
||||||
nix = { version = "0.30.1", features = ["user"] }
|
memchr = "2.8.0"
|
||||||
|
nix = { version = "0.31.2", features = ["process", "signal", "user"] }
|
||||||
openssl = "0.10.73"
|
openssl = "0.10.73"
|
||||||
page_size = "0.6.0"
|
page_size = "0.6.0"
|
||||||
reqwest = { version = "0.12.20", features = ["json", "stream", "native-tls"] }
|
reqwest = { version = "0.13.1", features = ["json", "stream", "native-tls", "socks"], default-features = false }
|
||||||
|
rpassword = "7.4.0"
|
||||||
|
rust-argon2 = "3.0.0"
|
||||||
serde = { version = "1.0.219", features = ["derive"] }
|
serde = { version = "1.0.219", features = ["derive"] }
|
||||||
serde_json = "1.0.140"
|
serde_json = "1.0.140"
|
||||||
serde_yaml = "0.9.34"
|
serde_yaml = "0.9.34"
|
||||||
|
signal-hook = "0.4.4"
|
||||||
|
tabled = "0.20.0"
|
||||||
thiserror = "2.0.12"
|
thiserror = "2.0.12"
|
||||||
tokio = { version = "1.45.1", features = ["fs", "io-std", "macros", "process", "rt"] }
|
tokio = { version = "1.45.1", features = ["fs", "io-std", "macros", "process", "rt"] }
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
from mcluseau/rust:1.88.0 as build
|
from mcluseau/rust:1.94.0 as build
|
||||||
|
|
||||||
workdir /app
|
workdir /app
|
||||||
copy . .
|
copy . .
|
||||||
@@ -10,6 +10,6 @@ run \
|
|||||||
&& find target/release -maxdepth 1 -type f -executable -exec cp -v {} /dist/ +
|
&& find target/release -maxdepth 1 -type f -executable -exec cp -v {} /dist/ +
|
||||||
|
|
||||||
# ------------------------------------------------------------------------
|
# ------------------------------------------------------------------------
|
||||||
from alpine:3.22
|
from alpine:3.23
|
||||||
copy --from=build /dist/ /bin/
|
copy --from=build /dist/ /bin/
|
||||||
|
|
||||||
|
|||||||
17
release.sh
Executable file
17
release.sh
Executable file
@@ -0,0 +1,17 @@
|
|||||||
|
set -ex
|
||||||
|
tag=$(git describe --always)
|
||||||
|
repo=novit.tech/direktil/dkl:$tag
|
||||||
|
|
||||||
|
docker build --push --platform=linux/amd64,linux/arm64 . -t $repo
|
||||||
|
|
||||||
|
publish() {
|
||||||
|
arch=$1
|
||||||
|
pf=$2
|
||||||
|
|
||||||
|
curl --user $(jq '.auths["novit.tech"].auth' ~/.docker/config.json -r |base64 -d) \
|
||||||
|
--upload-file <(docker run --rm --platform $pf $repo cat /bin/dkl) \
|
||||||
|
https://novit.tech/api/packages/direktil/generic/dkl/$tag/dkl.$arch
|
||||||
|
}
|
||||||
|
|
||||||
|
publish x86_64 linux/amd64
|
||||||
|
publish arm64 linux/arm64
|
||||||
50
src/apply.rs
50
src/apply.rs
@@ -3,21 +3,61 @@ use log::info;
|
|||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
|
|
||||||
pub async fn files(files: &[crate::File], root: &str) -> Result<()> {
|
use crate::base64_decode;
|
||||||
|
|
||||||
|
pub async fn files(files: &[crate::File], root: &str, dry_run: bool) -> Result<()> {
|
||||||
for file in files {
|
for file in files {
|
||||||
let path = chroot(root, &file.path);
|
let path = chroot(root, &file.path);
|
||||||
let path = Path::new(&path);
|
let path = Path::new(&path);
|
||||||
|
|
||||||
if let Some(parent) = path.parent() {
|
if !dry_run && let Some(parent) = path.parent() {
|
||||||
fs::create_dir_all(parent).await?;
|
fs::create_dir_all(parent).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
use crate::FileKind as K;
|
use crate::FileKind as K;
|
||||||
match &file.kind {
|
match &file.kind {
|
||||||
K::Content(content) => fs::write(path, content.as_bytes()).await?,
|
K::Content(content) => {
|
||||||
K::Dir(true) => fs::create_dir(path).await?,
|
if dry_run {
|
||||||
|
info!(
|
||||||
|
"would create {} ({} bytes from content)",
|
||||||
|
file.path,
|
||||||
|
content.len()
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
fs::write(path, content.as_bytes()).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
K::Content64(content) => {
|
||||||
|
let content = base64_decode(content)?;
|
||||||
|
if dry_run {
|
||||||
|
info!(
|
||||||
|
"would create {} ({} bytes from content64)",
|
||||||
|
file.path,
|
||||||
|
content.len()
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
fs::write(path, content).await?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
K::Dir(true) => {
|
||||||
|
if dry_run {
|
||||||
|
info!("would create {} (directory)", file.path);
|
||||||
|
} else {
|
||||||
|
fs::create_dir(path).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
K::Dir(false) => {} // shouldn't happen, but semantic is to ignore
|
K::Dir(false) => {} // shouldn't happen, but semantic is to ignore
|
||||||
K::Symlink(tgt) => fs::symlink(tgt, path).await?,
|
K::Symlink(tgt) => {
|
||||||
|
if dry_run {
|
||||||
|
info!("would create {} (symlink to {})", file.path, tgt);
|
||||||
|
} else {
|
||||||
|
fs::symlink(tgt, path).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if dry_run {
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
match file.kind {
|
match file.kind {
|
||||||
|
|||||||
@@ -1,6 +1,9 @@
|
|||||||
use clap::{CommandFactory, Parser, Subcommand};
|
use clap::{CommandFactory, Parser, Subcommand};
|
||||||
use eyre::{format_err, Result};
|
use eyre::{format_err, Result};
|
||||||
|
use human_units::Duration;
|
||||||
use log::{debug, error};
|
use log::{debug, error};
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::path::PathBuf;
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
|
|
||||||
#[derive(Parser)]
|
#[derive(Parser)]
|
||||||
@@ -22,6 +25,9 @@ enum Command {
|
|||||||
/// path prefix (aka chroot)
|
/// path prefix (aka chroot)
|
||||||
#[arg(short = 'P', long, default_value = "/")]
|
#[arg(short = 'P', long, default_value = "/")]
|
||||||
prefix: String,
|
prefix: String,
|
||||||
|
/// don't really write files
|
||||||
|
#[arg(long)]
|
||||||
|
dry_run: bool,
|
||||||
},
|
},
|
||||||
Logger {
|
Logger {
|
||||||
/// Path where the logs are stored
|
/// Path where the logs are stored
|
||||||
@@ -33,6 +39,9 @@ enum Command {
|
|||||||
/// prefix log lines with time & stream
|
/// prefix log lines with time & stream
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
with_prefix: bool,
|
with_prefix: bool,
|
||||||
|
/// exec command in this cgroup
|
||||||
|
#[arg(long)]
|
||||||
|
cgroup: Option<String>,
|
||||||
command: String,
|
command: String,
|
||||||
args: Vec<String>,
|
args: Vec<String>,
|
||||||
},
|
},
|
||||||
@@ -65,6 +74,34 @@ enum Command {
|
|||||||
#[arg(long, default_value = "/")]
|
#[arg(long, default_value = "/")]
|
||||||
chroot: std::path::PathBuf,
|
chroot: std::path::PathBuf,
|
||||||
},
|
},
|
||||||
|
Proxy {
|
||||||
|
#[arg(long, short = 'l')]
|
||||||
|
listen: Vec<SocketAddr>,
|
||||||
|
targets: Vec<SocketAddr>,
|
||||||
|
/// target polling interval
|
||||||
|
#[arg(long, default_value = "30s")]
|
||||||
|
poll: Duration,
|
||||||
|
/// connect or check timeout
|
||||||
|
#[arg(long, default_value = "5s")]
|
||||||
|
timeout: Duration,
|
||||||
|
},
|
||||||
|
|
||||||
|
Cg {
|
||||||
|
#[command(subcommand)]
|
||||||
|
cmd: CgCmd,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Subcommand)]
|
||||||
|
enum CgCmd {
|
||||||
|
Ls {
|
||||||
|
#[arg(long)]
|
||||||
|
root: Option<PathBuf>,
|
||||||
|
#[arg(long, short = 'X')]
|
||||||
|
exclude: Vec<String>,
|
||||||
|
#[arg(long, short = 'C')]
|
||||||
|
cols: Option<String>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main(flavor = "current_thread")]
|
#[tokio::main(flavor = "current_thread")]
|
||||||
@@ -84,14 +121,16 @@ async fn main() -> Result<()> {
|
|||||||
config,
|
config,
|
||||||
filters,
|
filters,
|
||||||
prefix,
|
prefix,
|
||||||
|
dry_run,
|
||||||
} => {
|
} => {
|
||||||
let filters = parse_globs(&filters)?;
|
let filters = parse_globs(&filters)?;
|
||||||
apply_config(&config, &filters, &prefix).await
|
apply_config(&config, &filters, &prefix, dry_run).await
|
||||||
}
|
}
|
||||||
C::Logger {
|
C::Logger {
|
||||||
ref log_path,
|
ref log_path,
|
||||||
ref log_name,
|
ref log_name,
|
||||||
with_prefix,
|
with_prefix,
|
||||||
|
cgroup,
|
||||||
command,
|
command,
|
||||||
args,
|
args,
|
||||||
} => {
|
} => {
|
||||||
@@ -103,7 +142,7 @@ async fn main() -> Result<()> {
|
|||||||
log_name,
|
log_name,
|
||||||
with_prefix,
|
with_prefix,
|
||||||
}
|
}
|
||||||
.run(command, &args)
|
.run(cgroup, command, &args)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
C::Log {
|
C::Log {
|
||||||
@@ -126,10 +165,37 @@ async fn main() -> Result<()> {
|
|||||||
.install(layer, version)
|
.install(layer, version)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
C::Proxy {
|
||||||
|
listen,
|
||||||
|
targets,
|
||||||
|
poll,
|
||||||
|
timeout,
|
||||||
|
} => Ok(dkl::proxy::Proxy {
|
||||||
|
listen_addrs: listen,
|
||||||
|
targets,
|
||||||
|
poll: poll.into(),
|
||||||
|
timeout: timeout.into(),
|
||||||
|
}
|
||||||
|
.run()
|
||||||
|
.await
|
||||||
|
.map(|_| ())?),
|
||||||
|
|
||||||
|
C::Cg { cmd } => match cmd {
|
||||||
|
CgCmd::Ls {
|
||||||
|
root,
|
||||||
|
exclude,
|
||||||
|
cols,
|
||||||
|
} => Ok(dkl::cgroup::ls(root, &exclude, cols.as_deref()).await?),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn apply_config(config_file: &str, filters: &[glob::Pattern], chroot: &str) -> Result<()> {
|
async fn apply_config(
|
||||||
|
config_file: &str,
|
||||||
|
filters: &[glob::Pattern],
|
||||||
|
chroot: &str,
|
||||||
|
dry_run: bool,
|
||||||
|
) -> Result<()> {
|
||||||
let config = fs::read_to_string(config_file).await?;
|
let config = fs::read_to_string(config_file).await?;
|
||||||
let config: dkl::Config = serde_yaml::from_str(&config)?;
|
let config: dkl::Config = serde_yaml::from_str(&config)?;
|
||||||
|
|
||||||
@@ -141,7 +207,7 @@ async fn apply_config(config_file: &str, filters: &[glob::Pattern], chroot: &str
|
|||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
|
|
||||||
dkl::apply::files(&files, chroot).await
|
dkl::apply::files(&files, chroot, dry_run).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Subcommand)]
|
#[derive(Subcommand)]
|
||||||
|
|||||||
@@ -36,6 +36,10 @@ enum Command {
|
|||||||
},
|
},
|
||||||
#[command(subcommand)]
|
#[command(subcommand)]
|
||||||
DlSet(DlSet),
|
DlSet(DlSet),
|
||||||
|
/// hash a password
|
||||||
|
Hash {
|
||||||
|
salt: String,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Subcommand)]
|
#[derive(Subcommand)]
|
||||||
@@ -103,14 +107,16 @@ async fn main() -> eyre::Result<()> {
|
|||||||
.parse_default_env()
|
.parse_default_env()
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
let token = std::env::var("DLS_TOKEN").map_err(|_| format_err!("DLS_TOKEN should be set"))?;
|
let dls = || {
|
||||||
|
let token = std::env::var("DLS_TOKEN").expect("DLS_TOKEN should be set");
|
||||||
let dls = dls::Client::new(cli.dls, token);
|
dls::Client::new(cli.dls, token)
|
||||||
|
};
|
||||||
|
|
||||||
use Command as C;
|
use Command as C;
|
||||||
match cli.command {
|
match cli.command {
|
||||||
C::Clusters => write_json(&dls.clusters().await?),
|
C::Clusters => write_json(&dls().clusters().await?),
|
||||||
C::Cluster { cluster, command } => {
|
C::Cluster { cluster, command } => {
|
||||||
|
let dls = dls();
|
||||||
let cluster = dls.cluster(cluster);
|
let cluster = dls.cluster(cluster);
|
||||||
|
|
||||||
use ClusterCommand as CC;
|
use ClusterCommand as CC;
|
||||||
@@ -155,8 +161,9 @@ async fn main() -> eyre::Result<()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
C::Hosts => write_json(&dls.hosts().await?),
|
C::Hosts => write_json(&dls().hosts().await?),
|
||||||
C::Host { out, host, asset } => {
|
C::Host { out, host, asset } => {
|
||||||
|
let dls = dls();
|
||||||
let host_name = host.clone();
|
let host_name = host.clone();
|
||||||
let host = dls.host(host);
|
let host = dls.host(host);
|
||||||
match asset {
|
match asset {
|
||||||
@@ -171,7 +178,7 @@ async fn main() -> eyre::Result<()> {
|
|||||||
C::DlSet(set) => match set {
|
C::DlSet(set) => match set {
|
||||||
DlSet::Sign { expiry, items } => {
|
DlSet::Sign { expiry, items } => {
|
||||||
let req = dls::DownloadSetReq { expiry, items };
|
let req = dls::DownloadSetReq { expiry, items };
|
||||||
let signed = dls.sign_dl_set(&req).await?;
|
let signed = dls().sign_dl_set(&req).await?;
|
||||||
println!("{signed}");
|
println!("{signed}");
|
||||||
}
|
}
|
||||||
DlSet::Show { signed_set } => {
|
DlSet::Show { signed_set } => {
|
||||||
@@ -211,11 +218,19 @@ async fn main() -> eyre::Result<()> {
|
|||||||
name,
|
name,
|
||||||
asset,
|
asset,
|
||||||
} => {
|
} => {
|
||||||
|
let dls = dls();
|
||||||
let stream = dls.fetch_dl_set(&signed_set, &kind, &name, &asset).await?;
|
let stream = dls.fetch_dl_set(&signed_set, &kind, &name, &asset).await?;
|
||||||
let mut out = create_asset_file(out, &kind, &name, &asset).await?;
|
let mut out = create_asset_file(out, &kind, &name, &asset).await?;
|
||||||
copy_stream(stream, &mut out).await?;
|
copy_stream(stream, &mut out).await?;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
C::Hash { salt } => {
|
||||||
|
let salt = dkl::base64_decode(&salt)?;
|
||||||
|
let passphrase = rpassword::prompt_password("password to hash: ")?;
|
||||||
|
let hash = dls::store::hash_password(&salt, &passphrase)?;
|
||||||
|
println!("hash (hex): {}", hex::encode(&hash));
|
||||||
|
println!("hash (base64): {}", dkl::base64_encode(&hash));
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ use std::collections::BTreeMap as Map;
|
|||||||
|
|
||||||
pub const TAKE_ALL: i16 = -1;
|
pub const TAKE_ALL: i16 = -1;
|
||||||
|
|
||||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub anti_phishing_code: String,
|
pub anti_phishing_code: String,
|
||||||
|
|
||||||
@@ -42,21 +42,11 @@ impl Config {
|
|||||||
pub fn new(bootstrap_dev: String) -> Self {
|
pub fn new(bootstrap_dev: String) -> Self {
|
||||||
Self {
|
Self {
|
||||||
anti_phishing_code: "Direktil<3".into(),
|
anti_phishing_code: "Direktil<3".into(),
|
||||||
keymap: None,
|
|
||||||
modules: None,
|
|
||||||
resolv_conf: None,
|
|
||||||
vpns: Map::new(),
|
|
||||||
networks: vec![],
|
|
||||||
auths: vec![],
|
|
||||||
ssh: Default::default(),
|
|
||||||
pre_lvm_crypt: vec![],
|
|
||||||
lvm: vec![],
|
|
||||||
crypt: vec![],
|
|
||||||
signer_public_key: None,
|
|
||||||
bootstrap: Bootstrap {
|
bootstrap: Bootstrap {
|
||||||
dev: bootstrap_dev,
|
dev: bootstrap_dev,
|
||||||
seed: None,
|
..Default::default()
|
||||||
},
|
},
|
||||||
|
..Default::default()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -88,6 +78,17 @@ pub struct NetworkInterface {
|
|||||||
pub udev: Option<UdevFilter>,
|
pub udev: Option<UdevFilter>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for NetworkInterface {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
var: "iface".into(),
|
||||||
|
n: 1,
|
||||||
|
regexps: Vec::new(),
|
||||||
|
udev: Some(UdevFilter::Eq("INTERFACE".into(), "eth0".into())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||||
pub struct SSHServer {
|
pub struct SSHServer {
|
||||||
pub listen: String,
|
pub listen: String,
|
||||||
@@ -104,7 +105,7 @@ impl Default for SSHServer {
|
|||||||
|
|
||||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||||
pub struct LvmVG {
|
pub struct LvmVG {
|
||||||
#[serde(alias = "vg")]
|
#[serde(rename = "vg", alias = "name")]
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub pvs: LvmPV,
|
pub pvs: LvmPV,
|
||||||
|
|
||||||
@@ -244,7 +245,7 @@ pub struct Raid {
|
|||||||
pub stripes: Option<u8>,
|
pub stripes: Option<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
|
||||||
pub struct Bootstrap {
|
pub struct Bootstrap {
|
||||||
pub dev: String,
|
pub dev: String,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
|||||||
381
src/cgroup.rs
Normal file
381
src/cgroup.rs
Normal file
@@ -0,0 +1,381 @@
|
|||||||
|
use std::borrow::Cow;
|
||||||
|
use std::fmt::Display;
|
||||||
|
use std::path::{Path as StdPath, PathBuf};
|
||||||
|
use std::rc::Rc;
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use crate::{fs, human::Human};
|
||||||
|
|
||||||
|
pub const ROOT: &str = "/sys/fs/cgroup";
|
||||||
|
|
||||||
|
pub async fn ls(
|
||||||
|
parent: Option<impl AsRef<StdPath>>,
|
||||||
|
exclude: &[String],
|
||||||
|
columns: Option<&str>,
|
||||||
|
) -> fs::Result<()> {
|
||||||
|
let mut root = PathBuf::from(ROOT);
|
||||||
|
if let Some(parent) = parent {
|
||||||
|
root = root.join(parent);
|
||||||
|
}
|
||||||
|
|
||||||
|
let cols: [(&str, fn(&Cgroup) -> String); _] = [
|
||||||
|
("wset", |cg| cg.memory.working_set().human()),
|
||||||
|
("anon", |cg| cg.memory.stat.anon.human()),
|
||||||
|
("min", |cg| cg.memory.min.human()),
|
||||||
|
("low", |cg| cg.memory.low.human()),
|
||||||
|
("high", |cg| cg.memory.high.human()),
|
||||||
|
("max", |cg| cg.memory.max.human()),
|
||||||
|
];
|
||||||
|
let cols = if let Some(columns) = columns {
|
||||||
|
(cols.into_iter())
|
||||||
|
.filter(|(n, _)| columns.split(',').any(|col| &col == n))
|
||||||
|
.collect()
|
||||||
|
} else {
|
||||||
|
cols.to_vec()
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut table = tabled::builder::Builder::new();
|
||||||
|
table.push_record(["cgroup"].into_iter().chain(cols.iter().map(|(n, _)| *n)));
|
||||||
|
|
||||||
|
let mut todo = vec![(Cgroup::root(root).await?, vec![], true)];
|
||||||
|
while let Some((cg, p_lasts, last)) = todo.pop() {
|
||||||
|
let mut name = String::new();
|
||||||
|
for last in p_lasts.iter().skip(1) {
|
||||||
|
name.push_str(if *last { " " } else { "| " });
|
||||||
|
}
|
||||||
|
if !p_lasts.is_empty() {
|
||||||
|
name.push_str(if last { "`- " } else { "|- " });
|
||||||
|
}
|
||||||
|
name.push_str(&cg.name());
|
||||||
|
|
||||||
|
table.push_record([name].into_iter().chain(cols.iter().map(|(_, f)| f(&cg))));
|
||||||
|
|
||||||
|
let mut p_lasts = p_lasts.clone();
|
||||||
|
p_lasts.push(last);
|
||||||
|
|
||||||
|
let mut children = cg.read_children().await?;
|
||||||
|
children.sort();
|
||||||
|
todo.extend(
|
||||||
|
(children.into_iter().rev())
|
||||||
|
.filter(|c| !exclude.iter().any(|x| x == &c.path.full_name()))
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, child)| (child, p_lasts.clone(), i == 0)),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
use tabled::settings::{
|
||||||
|
object::{Column, Row},
|
||||||
|
Alignment, Modify,
|
||||||
|
};
|
||||||
|
let mut table = table.build();
|
||||||
|
table.with(tabled::settings::Style::psql());
|
||||||
|
table.with(Alignment::right());
|
||||||
|
table.with(Modify::list(Column::from(0), Alignment::left()));
|
||||||
|
table.with(Modify::list(Row::from(0), Alignment::left()));
|
||||||
|
|
||||||
|
println!("{}", table);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Cgroup {
|
||||||
|
path: Rc<Path>,
|
||||||
|
children: Vec<PathBuf>,
|
||||||
|
memory: Memory,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Cgroup {
|
||||||
|
pub async fn root(path: impl AsRef<StdPath>) -> fs::Result<Self> {
|
||||||
|
let path = path.as_ref();
|
||||||
|
Self::read(Path::root(path), path).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read(cg_path: Rc<Path>, path: impl AsRef<StdPath>) -> fs::Result<Self> {
|
||||||
|
let path = path.as_ref();
|
||||||
|
|
||||||
|
use fs::Error as E;
|
||||||
|
|
||||||
|
let mut rd = fs::read_dir(path).await?;
|
||||||
|
|
||||||
|
let mut cg = Self {
|
||||||
|
path: cg_path,
|
||||||
|
children: Vec::new(),
|
||||||
|
memory: Memory::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
while let Some(entry) = (rd.next_entry().await).map_err(|e| E::ReadDir(path.into(), e))? {
|
||||||
|
let path = entry.path();
|
||||||
|
|
||||||
|
let Some(file_name) = path.file_name() else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
if (entry.file_type().await)
|
||||||
|
.map_err(|e| E::Stat(path.clone(), e))?
|
||||||
|
.is_dir()
|
||||||
|
{
|
||||||
|
cg.children.push(file_name.into());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let file_name = file_name.as_encoded_bytes();
|
||||||
|
let Some(idx) = file_name.iter().position(|b| *b == b'.') else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
let (controller, param) = file_name.split_at(idx);
|
||||||
|
let param = ¶m[1..];
|
||||||
|
|
||||||
|
match controller {
|
||||||
|
b"memory" => match param {
|
||||||
|
b"current" => cg.memory.current = read_parse(path).await?,
|
||||||
|
b"low" => cg.memory.low = read_parse(path).await?,
|
||||||
|
b"high" => cg.memory.high = read_parse(path).await?,
|
||||||
|
b"min" => cg.memory.min = read_parse(path).await?,
|
||||||
|
b"max" => cg.memory.max = read_parse(path).await?,
|
||||||
|
b"stat" => cg.memory.stat.read_from(path).await?,
|
||||||
|
_ => {}
|
||||||
|
},
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(cg)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_children(&self) -> fs::Result<Vec<Self>> {
|
||||||
|
let mut r = Vec::with_capacity(self.children.len());
|
||||||
|
|
||||||
|
let mut dir = PathBuf::from(self.path.as_ref());
|
||||||
|
|
||||||
|
for child_name in &self.children {
|
||||||
|
dir.push(child_name);
|
||||||
|
let child_path = Path::Child(self.path.clone(), child_name.into());
|
||||||
|
r.push(Self::read(child_path.into(), &dir).await?);
|
||||||
|
dir.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn name(&self) -> Cow<'_, str> {
|
||||||
|
self.path.name()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn full_name(&self) -> String {
|
||||||
|
self.path.full_name()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn children(&self) -> impl Iterator<Item = &StdPath> {
|
||||||
|
self.children.iter().map(|n| n.as_path())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn read_child(&self, name: impl AsRef<StdPath>) -> fs::Result<Self> {
|
||||||
|
let name = name.as_ref();
|
||||||
|
let mut dir = PathBuf::from(self.path.as_ref());
|
||||||
|
dir.push(name);
|
||||||
|
Self::read(self.path.child(name), &dir).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn write_param(
|
||||||
|
&self,
|
||||||
|
name: impl AsRef<StdPath>,
|
||||||
|
value: impl AsRef<[u8]>,
|
||||||
|
) -> fs::Result<()> {
|
||||||
|
let cg_dir = PathBuf::from(self.path.as_ref());
|
||||||
|
fs::write(cg_dir.join(name), value).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialEq for Cgroup {
|
||||||
|
fn eq(&self, o: &Self) -> bool {
|
||||||
|
&self.path == &o.path
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl Eq for Cgroup {}
|
||||||
|
|
||||||
|
impl Ord for Cgroup {
|
||||||
|
fn cmp(&self, o: &Self) -> std::cmp::Ordering {
|
||||||
|
self.path.cmp(&o.path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl PartialOrd for Cgroup {
|
||||||
|
fn partial_cmp(&self, o: &Self) -> Option<std::cmp::Ordering> {
|
||||||
|
Some(self.cmp(o))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
|
enum Path {
|
||||||
|
Root(PathBuf),
|
||||||
|
Child(Rc<Path>, PathBuf),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Path {
|
||||||
|
fn name(&self) -> Cow<'_, str> {
|
||||||
|
match self {
|
||||||
|
Self::Root(_) => "/".into(),
|
||||||
|
Self::Child(_, n) => n.to_string_lossy(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn full_name(&self) -> String {
|
||||||
|
use Path::*;
|
||||||
|
match self {
|
||||||
|
Root(_) => "/".into(),
|
||||||
|
Child(parent, _) => match parent.as_ref() {
|
||||||
|
Root(_) => self.name().into(),
|
||||||
|
Child(_, _) => format!("{}/{}", parent.full_name(), self.name()),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn depth(&self) -> usize {
|
||||||
|
use Path::*;
|
||||||
|
match self {
|
||||||
|
Root(_) => 0,
|
||||||
|
Child(p, _) => 1 + p.depth(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn root(dir: impl Into<PathBuf>) -> Rc<Self> {
|
||||||
|
Rc::new(Self::Root(dir.into()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn child(self: &Rc<Self>, name: impl Into<PathBuf>) -> Rc<Self> {
|
||||||
|
Rc::new(Self::Child(self.clone(), name.into()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&Path> for PathBuf {
|
||||||
|
fn from(mut p: &Path) -> Self {
|
||||||
|
let mut stack = Vec::with_capacity(p.depth() + 1);
|
||||||
|
loop {
|
||||||
|
match p {
|
||||||
|
Path::Root(root_path) => {
|
||||||
|
stack.push(root_path);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Path::Child(parent, n) => {
|
||||||
|
stack.push(n);
|
||||||
|
p = parent;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let len = stack.iter().map(|p| p.as_os_str().len() + 1).sum::<usize>() - 1;
|
||||||
|
|
||||||
|
let mut buf = PathBuf::with_capacity(len);
|
||||||
|
buf.extend(stack.into_iter().rev());
|
||||||
|
buf
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_path_to_pathbuf() {
|
||||||
|
let root = Path::root("/a/b");
|
||||||
|
let c1 = root.child("c1");
|
||||||
|
let c1_1 = c1.child("c1-1");
|
||||||
|
|
||||||
|
assert_eq!(PathBuf::from("/a/b/c1"), PathBuf::from(c1.as_ref()));
|
||||||
|
assert_eq!(PathBuf::from("/a/b/c1/c1-1"), PathBuf::from(c1_1.as_ref()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct Memory {
|
||||||
|
current: Option<u64>,
|
||||||
|
low: Option<u64>,
|
||||||
|
high: Option<Max>,
|
||||||
|
min: Option<u64>,
|
||||||
|
max: Option<Max>,
|
||||||
|
stat: MemoryStat,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Memory {
|
||||||
|
/// working set as defined by cAdvisor
|
||||||
|
/// (https://github.com/google/cadvisor/blob/e1ccfa9b4cf2e17d74e0f5526b6487b74b704503/container/libcontainer/handler.go#L853-L862)
|
||||||
|
fn working_set(&self) -> Option<u64> {
|
||||||
|
let cur = self.current?;
|
||||||
|
let inactive = self.stat.inactive_file?;
|
||||||
|
(inactive <= cur).then(|| cur - inactive)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct MemoryStat {
|
||||||
|
anon: Option<u64>,
|
||||||
|
file: Option<u64>,
|
||||||
|
kernel: Option<u64>,
|
||||||
|
kernel_stack: Option<u64>,
|
||||||
|
pagetables: Option<u64>,
|
||||||
|
shmem: Option<u64>,
|
||||||
|
inactive_file: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
enum Max {
|
||||||
|
Num(u64),
|
||||||
|
Max,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for Max {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::Num(n) => write!(f, "{n}"),
|
||||||
|
Self::Max => f.write_str("max"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Human for Max {
|
||||||
|
fn human(&self) -> String {
|
||||||
|
match self {
|
||||||
|
Self::Num(n) => n.human(),
|
||||||
|
Self::Max => "+∞".into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromStr for Max {
|
||||||
|
type Err = std::num::ParseIntError;
|
||||||
|
fn from_str(s: &str) -> std::result::Result<Self, <Self as FromStr>::Err> {
|
||||||
|
Ok(match s {
|
||||||
|
"max" => Self::Max,
|
||||||
|
s => Self::Num(s.parse()?),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_parse<T: FromStr>(path: impl AsRef<StdPath>) -> fs::Result<Option<T>>
|
||||||
|
where
|
||||||
|
T::Err: Display,
|
||||||
|
{
|
||||||
|
let path = path.as_ref();
|
||||||
|
|
||||||
|
(fs::read_to_string(path).await?)
|
||||||
|
.trim_ascii()
|
||||||
|
.parse()
|
||||||
|
.map_err(|e| fs::Error::Other(path.into(), format!("parse failed: {e}")))
|
||||||
|
.map(|v| Some(v))
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MemoryStat {
|
||||||
|
async fn read_from(&mut self, path: impl AsRef<StdPath>) -> fs::Result<()> {
|
||||||
|
for line in (fs::read_to_string(path).await?).lines() {
|
||||||
|
let Some((key, value)) = line.split_once(' ') else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let value = value.parse::<u64>().ok();
|
||||||
|
match key {
|
||||||
|
"anon" => self.anon = value,
|
||||||
|
"file" => self.file = value,
|
||||||
|
"kernel" => self.kernel = value,
|
||||||
|
"kernel_stack" => self.kernel_stack = value,
|
||||||
|
"pagetables" => self.pagetables = value,
|
||||||
|
"shmem" => self.shmem = value,
|
||||||
|
"inactive_file" => self.inactive_file = value,
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
41
src/dls.rs
41
src/dls.rs
@@ -6,6 +6,8 @@ use std::collections::BTreeMap as Map;
|
|||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
|
|
||||||
|
pub mod store;
|
||||||
|
|
||||||
pub struct Client {
|
pub struct Client {
|
||||||
base_url: String,
|
base_url: String,
|
||||||
token: String,
|
token: String,
|
||||||
@@ -159,6 +161,32 @@ impl<'t> Host<'t> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default, serde::Deserialize, serde::Serialize)]
|
||||||
|
#[serde(rename_all = "PascalCase")]
|
||||||
|
pub struct Config {
|
||||||
|
#[serde(default, deserialize_with = "deserialize_null_as_default")]
|
||||||
|
pub clusters: Vec<ClusterConfig>,
|
||||||
|
#[serde(default, deserialize_with = "deserialize_null_as_default")]
|
||||||
|
pub hosts: Vec<HostConfig>,
|
||||||
|
#[serde(default, deserialize_with = "deserialize_null_as_default")]
|
||||||
|
pub host_templates: Vec<HostConfig>,
|
||||||
|
#[serde(default, rename = "SSLConfig")]
|
||||||
|
pub ssl_config: String,
|
||||||
|
#[serde(default, deserialize_with = "deserialize_null_as_default")]
|
||||||
|
pub extra_ca_certs: Map<String, String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// compensate for go's encoder pitfalls
|
||||||
|
use serde::{Deserialize, Deserializer};
|
||||||
|
fn deserialize_null_as_default<'de, D, T>(deserializer: D) -> std::result::Result<T, D::Error>
|
||||||
|
where
|
||||||
|
T: Default + Deserialize<'de>,
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let opt = Option::deserialize(deserializer)?;
|
||||||
|
Ok(opt.unwrap_or_default())
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(serde::Deserialize, serde::Serialize)]
|
#[derive(serde::Deserialize, serde::Serialize)]
|
||||||
#[serde(rename_all = "PascalCase")]
|
#[serde(rename_all = "PascalCase")]
|
||||||
pub struct ClusterConfig {
|
pub struct ClusterConfig {
|
||||||
@@ -171,15 +199,15 @@ pub struct ClusterConfig {
|
|||||||
#[serde(rename_all = "PascalCase")]
|
#[serde(rename_all = "PascalCase")]
|
||||||
pub struct HostConfig {
|
pub struct HostConfig {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
pub cluster_name: Option<String>,
|
pub cluster_name: Option<String>,
|
||||||
|
|
||||||
#[serde(rename = "IPs")]
|
#[serde(rename = "IPs")]
|
||||||
pub ips: Vec<IpAddr>,
|
pub ips: Vec<IpAddr>,
|
||||||
|
|
||||||
#[serde(skip_serializing_if = "Map::is_empty")]
|
#[serde(default, skip_serializing_if = "Map::is_empty")]
|
||||||
pub labels: Map<String, String>,
|
pub labels: Map<String, String>,
|
||||||
#[serde(skip_serializing_if = "Map::is_empty")]
|
#[serde(default, skip_serializing_if = "Map::is_empty")]
|
||||||
pub annotations: Map<String, String>,
|
pub annotations: Map<String, String>,
|
||||||
|
|
||||||
#[serde(rename = "IPXE", skip_serializing_if = "Option::is_none")]
|
#[serde(rename = "IPXE", skip_serializing_if = "Option::is_none")]
|
||||||
@@ -189,10 +217,13 @@ pub struct HostConfig {
|
|||||||
pub kernel: String,
|
pub kernel: String,
|
||||||
pub versions: Map<String, String>,
|
pub versions: Map<String, String>,
|
||||||
|
|
||||||
|
/// initrd config template
|
||||||
pub bootstrap_config: String,
|
pub bootstrap_config: String,
|
||||||
#[serde(skip_serializing_if = "Map::is_empty")]
|
/// files to add to the final initrd config, with rendering
|
||||||
pub initrd_files: Map<String, String>,
|
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||||
|
pub initrd_files: Vec<crate::File>,
|
||||||
|
|
||||||
|
/// system config template
|
||||||
pub config: String,
|
pub config: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
17
src/dls/store.rs
Normal file
17
src/dls/store.rs
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
pub fn hash_password(salt: &[u8], passphrase: &str) -> argon2::Result<[u8; 32]> {
|
||||||
|
let hash = argon2::hash_raw(
|
||||||
|
passphrase.as_bytes(),
|
||||||
|
salt,
|
||||||
|
&argon2::Config {
|
||||||
|
variant: argon2::Variant::Argon2id,
|
||||||
|
hash_length: 32,
|
||||||
|
time_cost: 1,
|
||||||
|
mem_cost: 65536,
|
||||||
|
thread_mode: argon2::ThreadMode::Parallel,
|
||||||
|
lanes: 4,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
|
unsafe { Ok(hash.try_into().unwrap_unchecked()) }
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
use eyre::{format_err, Result};
|
use eyre::{Result, format_err};
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use tokio::{fs, io::AsyncWriteExt, process::Command};
|
use tokio::{fs, io::AsyncWriteExt, process::Command};
|
||||||
|
|||||||
52
src/fs.rs
52
src/fs.rs
@@ -1,9 +1,53 @@
|
|||||||
use eyre::Result;
|
|
||||||
use std::fs::Metadata;
|
use std::fs::Metadata;
|
||||||
use std::path::PathBuf;
|
use std::path::{Path, PathBuf};
|
||||||
use tokio::fs::read_dir;
|
use tokio::fs;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
|
pub use tokio::fs::ReadDir;
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum Error {
|
||||||
|
#[error("{0}: read dir: {1}")]
|
||||||
|
ReadDir(PathBuf, std::io::Error),
|
||||||
|
#[error("{0}: exists: {1}")]
|
||||||
|
Exists(PathBuf, std::io::Error),
|
||||||
|
#[error("{0}: read: {1}")]
|
||||||
|
Read(PathBuf, std::io::Error),
|
||||||
|
#[error("{0}: stat: {1}")]
|
||||||
|
Stat(PathBuf, std::io::Error),
|
||||||
|
#[error("{0}: create dir: {1}")]
|
||||||
|
CreateDir(PathBuf, std::io::Error),
|
||||||
|
#[error("{0}: write: {1}")]
|
||||||
|
Write(PathBuf, std::io::Error),
|
||||||
|
#[error("{0}: remove file: {1}")]
|
||||||
|
RemoveFile(PathBuf, std::io::Error),
|
||||||
|
#[error("{0}: symlink: {1}")]
|
||||||
|
Symlink(PathBuf, std::io::Error),
|
||||||
|
#[error("{0}: {1}")]
|
||||||
|
Other(PathBuf, String),
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! wrap_path {
|
||||||
|
($fn:ident $( ( $( $pname:ident : $ptype:ty ),* ) )? -> $result:ty, $err:ident) => {
|
||||||
|
pub async fn $fn(path: impl AsRef<Path>$($(, $pname: $ptype)*)?) -> Result<$result> {
|
||||||
|
let path = path.as_ref();
|
||||||
|
fs::$fn(path $($(, $pname)*)?).await.map_err(|e| Error::$err(path.into(), e))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
wrap_path!(read_dir -> ReadDir, ReadDir);
|
||||||
|
wrap_path!(try_exists -> bool, Exists);
|
||||||
|
wrap_path!(read -> Vec<u8>, Read);
|
||||||
|
wrap_path!(read_to_string -> String, Read);
|
||||||
|
wrap_path!(create_dir -> (), CreateDir);
|
||||||
|
wrap_path!(create_dir_all -> (), CreateDir);
|
||||||
|
wrap_path!(remove_file -> (), RemoveFile);
|
||||||
|
wrap_path!(symlink(link_src: impl AsRef<Path>) -> (), Symlink);
|
||||||
|
wrap_path!(write(content: impl AsRef<[u8]>) -> (), Write);
|
||||||
|
|
||||||
pub fn spawn_walk_dir(
|
pub fn spawn_walk_dir(
|
||||||
dir: impl Into<PathBuf> + Send + 'static,
|
dir: impl Into<PathBuf> + Send + 'static,
|
||||||
) -> mpsc::Receiver<Result<(PathBuf, Metadata)>> {
|
) -> mpsc::Receiver<Result<(PathBuf, Metadata)>> {
|
||||||
@@ -24,7 +68,7 @@ pub async fn walk_dir(dir: impl Into<PathBuf>, tx: mpsc::Sender<Result<(PathBuf,
|
|||||||
let entry = match rd.next_entry().await {
|
let entry = match rd.next_entry().await {
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
if tx.send(Err(e.into())).await.is_err() {
|
if tx.send(Err(Error::ReadDir(dir.clone(), e))).await.is_err() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
todo.pop_front(); // skip dir on error
|
todo.pop_front(); // skip dir on error
|
||||||
|
|||||||
35
src/human.rs
Normal file
35
src/human.rs
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
use human_units::FormatSize;
|
||||||
|
use std::fmt::{Display, Formatter, Result};
|
||||||
|
|
||||||
|
pub trait Human {
|
||||||
|
fn human(&self) -> String;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Quantity(u64);
|
||||||
|
|
||||||
|
impl Display for Quantity {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
|
||||||
|
self.0.format_size().fmt(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Human for Quantity {
|
||||||
|
fn human(&self) -> String {
|
||||||
|
self.to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Human for u64 {
|
||||||
|
fn human(&self) -> String {
|
||||||
|
self.format_size().to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Human> Human for Option<T> {
|
||||||
|
fn human(&self) -> String {
|
||||||
|
match self {
|
||||||
|
Some(h) => h.human(),
|
||||||
|
None => "◌".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
32
src/lib.rs
32
src/lib.rs
@@ -1,9 +1,13 @@
|
|||||||
pub mod apply;
|
pub mod apply;
|
||||||
pub mod bootstrap;
|
pub mod bootstrap;
|
||||||
|
pub mod cgroup;
|
||||||
pub mod dls;
|
pub mod dls;
|
||||||
pub mod dynlay;
|
pub mod dynlay;
|
||||||
pub mod fs;
|
pub mod fs;
|
||||||
|
pub mod human;
|
||||||
pub mod logger;
|
pub mod logger;
|
||||||
|
pub mod proxy;
|
||||||
|
pub mod rc;
|
||||||
|
|
||||||
#[derive(Debug, Default, serde::Deserialize, serde::Serialize)]
|
#[derive(Debug, Default, serde::Deserialize, serde::Serialize)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
@@ -52,7 +56,7 @@ pub struct User {
|
|||||||
pub gid: Option<u32>,
|
pub gid: Option<u32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
|
||||||
pub struct File {
|
pub struct File {
|
||||||
pub path: String,
|
pub path: String,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
@@ -61,10 +65,32 @@ pub struct File {
|
|||||||
pub kind: FileKind,
|
pub kind: FileKind,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
|
||||||
#[serde(rename_all = "snake_case")]
|
#[serde(rename_all = "lowercase")]
|
||||||
pub enum FileKind {
|
pub enum FileKind {
|
||||||
Content(String),
|
Content(String),
|
||||||
|
Content64(String),
|
||||||
Symlink(String),
|
Symlink(String),
|
||||||
Dir(bool),
|
Dir(bool),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
|
||||||
|
impl Config {
|
||||||
|
pub fn has_file(&self, path: &str) -> bool {
|
||||||
|
self.files.iter().any(|f| f.path == path)
|
||||||
|
}
|
||||||
|
pub fn file(&self, path: &str) -> Option<&File> {
|
||||||
|
self.files.iter().find(|f| f.path == path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn base64_decode(s: &str) -> Result<Vec<u8>, base64::DecodeError> {
|
||||||
|
use base64::{Engine as _, prelude::BASE64_STANDARD_NO_PAD as B64};
|
||||||
|
B64.decode(s.trim_end_matches('='))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn base64_encode(b: &[u8]) -> String {
|
||||||
|
use base64::{Engine as _, prelude::BASE64_STANDARD as B64};
|
||||||
|
B64.encode(b)
|
||||||
|
}
|
||||||
|
|||||||
215
src/logger.rs
215
src/logger.rs
@@ -5,13 +5,15 @@ use log::{debug, error, warn};
|
|||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::process::Stdio;
|
use std::process::Stdio;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
fs::{self, File},
|
fs::File,
|
||||||
io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter},
|
io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter},
|
||||||
process,
|
process,
|
||||||
sync::mpsc,
|
sync::mpsc,
|
||||||
time::{sleep, Duration},
|
time::{sleep, Duration},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use crate::{cgroup, fs};
|
||||||
|
|
||||||
pub type Timestamp = chrono::DateTime<Utc>;
|
pub type Timestamp = chrono::DateTime<Utc>;
|
||||||
|
|
||||||
const TS_FORMAT: &str = "%Y%m%d_%H";
|
const TS_FORMAT: &str = "%Y%m%d_%H";
|
||||||
@@ -27,7 +29,7 @@ pub struct Logger<'t> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<'t> Logger<'t> {
|
impl<'t> Logger<'t> {
|
||||||
pub async fn run(&self, command: &str, args: &[String]) -> Result<()> {
|
pub async fn run(&self, cgroup: Option<String>, command: &str, args: &[String]) -> Result<()> {
|
||||||
// make sure we can at least open the log before starting the command
|
// make sure we can at least open the log before starting the command
|
||||||
let archives_path = &format!("{path}/archives", path = self.log_path);
|
let archives_path = &format!("{path}/archives", path = self.log_path);
|
||||||
(fs::create_dir_all(archives_path).await)
|
(fs::create_dir_all(archives_path).await)
|
||||||
@@ -45,20 +47,42 @@ impl<'t> Logger<'t> {
|
|||||||
));
|
));
|
||||||
|
|
||||||
// start the command
|
// start the command
|
||||||
let mut child = process::Command::new(command)
|
let mut cmd = process::Command::new(command);
|
||||||
.args(args)
|
cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped());
|
||||||
.stdout(Stdio::piped())
|
if let Some(cgroup) = cgroup.as_deref() {
|
||||||
.stderr(Stdio::piped())
|
let mut cg_path = PathBuf::from(cgroup::ROOT);
|
||||||
.spawn()?;
|
cg_path.push(cgroup);
|
||||||
|
cg_path.push(self.log_name);
|
||||||
|
|
||||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
use std::io::ErrorKind as K;
|
||||||
|
match tokio::fs::create_dir(&cg_path).await {
|
||||||
|
Ok(_) => debug!("created dir {}", cg_path.display()),
|
||||||
|
Err(e) if e.kind() == K::AlreadyExists => {
|
||||||
|
debug!("existing dir {}", cg_path.display())
|
||||||
|
}
|
||||||
|
Err(e) => return Err(fs::Error::CreateDir(cg_path, e).into()),
|
||||||
|
}
|
||||||
|
|
||||||
|
let procs_file = cg_path.join("cgroup.procs");
|
||||||
|
debug!("procs file {}", procs_file.display());
|
||||||
|
fs::write(&procs_file, b"0").await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut child = cmd.spawn().map_err(|e| format_err!("exec failed: {e}"))?;
|
||||||
|
|
||||||
|
let (tx, mut rx) = mpsc::channel(8);
|
||||||
|
|
||||||
tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone()));
|
tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone()));
|
||||||
tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx));
|
tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx));
|
||||||
|
|
||||||
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
|
// forward signals
|
||||||
|
if let Some(child_pid) = child.id() {
|
||||||
|
forward_signals_to(child_pid as i32);
|
||||||
|
}
|
||||||
|
|
||||||
// handle output
|
// handle output
|
||||||
|
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select!(
|
tokio::select!(
|
||||||
r = rx.recv() => {
|
r = rx.recv() => {
|
||||||
@@ -78,13 +102,14 @@ impl<'t> Logger<'t> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let status = child.wait().await?;
|
||||||
|
|
||||||
// finalize
|
// finalize
|
||||||
while let Err(e) = current_log.flush().await {
|
while let Err(e) = current_log.flush().await {
|
||||||
error!("final log flush failed: {e}");
|
error!("final log flush failed: {e}");
|
||||||
sleep(WRITE_RETRY_DELAY).await;
|
sleep(WRITE_RETRY_DELAY).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
let status = child.wait().await?;
|
|
||||||
std::process::exit(status.code().unwrap_or(-1));
|
std::process::exit(status.code().unwrap_or(-1));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,6 +137,9 @@ impl<'t> Logger<'t> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
out.write_all(&log.line).await?;
|
out.write_all(&log.line).await?;
|
||||||
|
if log.line.last() != Some(&b'\n') {
|
||||||
|
out.write("↵\n".as_bytes()).await?;
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,29 +153,28 @@ impl<'t> Logger<'t> {
|
|||||||
.open(log_file)
|
.open(log_file)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let link_src = &format!(
|
let link_src = &PathBuf::from(self.log_path)
|
||||||
"{path}/{name}.log",
|
.join(self.log_name)
|
||||||
path = self.log_path,
|
.with_added_extension("log");
|
||||||
name = self.log_name
|
let link_tgt = &self.archive_rel_path(ts);
|
||||||
);
|
|
||||||
let link_tgt = log_file
|
|
||||||
.strip_prefix(&format!("{}/", self.log_path))
|
|
||||||
.unwrap_or(log_file);
|
|
||||||
|
|
||||||
let _ = fs::remove_file(link_src).await;
|
let _ = fs::remove_file(link_src).await;
|
||||||
fs::symlink(link_tgt, link_src)
|
fs::symlink(link_tgt, link_src).await.map_err(|e| {
|
||||||
.await
|
format_err!(
|
||||||
.map_err(|e| format_err!("symlink {link_src} -> {link_tgt} failed: {e}",))?;
|
"symlink {s} -> {t} failed: {e}",
|
||||||
|
s = link_src.display(),
|
||||||
|
t = link_tgt.display()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
Ok(file)
|
Ok(file)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn archive_path(&self, ts: Timestamp) -> String {
|
fn archive_path(&self, ts: Timestamp) -> PathBuf {
|
||||||
format!(
|
PathBuf::from(self.log_path).join(self.archive_rel_path(ts))
|
||||||
"{path}/archives/{file}",
|
}
|
||||||
path = self.log_path,
|
fn archive_rel_path(&self, ts: Timestamp) -> PathBuf {
|
||||||
file = self.archive_file(ts)
|
PathBuf::from("archives").join(self.archive_file(ts))
|
||||||
)
|
|
||||||
}
|
}
|
||||||
fn archive_file(&self, ts: Timestamp) -> String {
|
fn archive_file(&self, ts: Timestamp) -> String {
|
||||||
format!(
|
format!(
|
||||||
@@ -158,39 +185,101 @@ impl<'t> Logger<'t> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn forward_signals_to(pid: i32) {
|
||||||
|
use nix::{
|
||||||
|
sys::signal::{kill, Signal},
|
||||||
|
unistd::Pid,
|
||||||
|
};
|
||||||
|
use signal_hook::{consts::*, low_level::register};
|
||||||
|
|
||||||
|
debug!("forwarding signals to pid {pid}");
|
||||||
|
|
||||||
|
let pid = Pid::from_raw(pid);
|
||||||
|
let signals = [
|
||||||
|
SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGUSR1, SIGUSR2, SIGPIPE, SIGALRM,
|
||||||
|
];
|
||||||
|
|
||||||
|
for sig in signals {
|
||||||
|
let Ok(signal) = Signal::try_from(sig) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
unsafe {
|
||||||
|
register(sig, move || {
|
||||||
|
debug!("forwarding {signal} to {pid}");
|
||||||
|
let _ = kill(pid, signal);
|
||||||
|
})
|
||||||
|
.ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct LogItem {
|
struct LogItem {
|
||||||
stream_name: &'static str,
|
stream_name: &'static str,
|
||||||
ts: chrono::DateTime<chrono::Utc>,
|
ts: chrono::DateTime<chrono::Utc>,
|
||||||
line: Vec<u8>,
|
line: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn copy(
|
async fn copy(stream_name: &'static str, out: impl AsyncRead + Unpin, tx: mpsc::Sender<LogItem>) {
|
||||||
stream_name: &'static str,
|
|
||||||
out: impl AsyncRead + Unpin,
|
|
||||||
tx: mpsc::UnboundedSender<LogItem>,
|
|
||||||
) {
|
|
||||||
let mut out = BufReader::new(out);
|
|
||||||
let buf_size = page_size::get();
|
let buf_size = page_size::get();
|
||||||
|
let mut out = BufReader::with_capacity(buf_size, out);
|
||||||
|
let mut line = Vec::with_capacity(buf_size);
|
||||||
|
|
||||||
|
macro_rules! send_line {
|
||||||
|
() => {
|
||||||
|
let log = LogItem {
|
||||||
|
stream_name,
|
||||||
|
ts: chrono::Utc::now(),
|
||||||
|
line: line.clone(),
|
||||||
|
};
|
||||||
|
if let Err(e) = tx.send(log).await {
|
||||||
|
warn!("send line failed: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
line.clear();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut line = Vec::with_capacity(buf_size);
|
let Ok(buf) = (out.fill_buf())
|
||||||
if let Err(e) = out.read_until(b'\n', &mut line).await {
|
.await
|
||||||
warn!("read {stream_name} failed: {e}");
|
.inspect_err(|e| warn!("read {stream_name} failed: {e}"))
|
||||||
return;
|
else {
|
||||||
}
|
break;
|
||||||
if line.is_empty() {
|
};
|
||||||
|
|
||||||
|
if buf.is_empty() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
let log = LogItem {
|
let remaining = buf_size - line.len();
|
||||||
stream_name,
|
|
||||||
ts: chrono::Utc::now(),
|
if let Some(pos) = memchr::memchr(b'\n', buf) {
|
||||||
line,
|
let len = pos + 1;
|
||||||
};
|
let mut buf = &buf[..len];
|
||||||
if let Err(e) = tx.send(log) {
|
|
||||||
warn!("send line failed: {e}");
|
if len > remaining {
|
||||||
return;
|
line.extend_from_slice(&buf[..remaining]);
|
||||||
|
send_line!();
|
||||||
|
buf = &buf[remaining..];
|
||||||
|
}
|
||||||
|
|
||||||
|
line.extend_from_slice(buf);
|
||||||
|
out.consume(len);
|
||||||
|
send_line!();
|
||||||
|
} else if buf.len() > remaining {
|
||||||
|
line.extend_from_slice(&buf[..remaining]);
|
||||||
|
out.consume(remaining);
|
||||||
|
send_line!();
|
||||||
|
} else {
|
||||||
|
line.extend_from_slice(buf);
|
||||||
|
let len = buf.len();
|
||||||
|
out.consume(len);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !line.is_empty() {
|
||||||
|
send_line!();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn trunc_ts(ts: Timestamp) -> Timestamp {
|
pub fn trunc_ts(ts: Timestamp) -> Timestamp {
|
||||||
@@ -250,14 +339,14 @@ async fn compress(path: impl AsRef<Path>) {
|
|||||||
let mut out = ZstdEncoder::new(out);
|
let mut out = ZstdEncoder::new(out);
|
||||||
async {
|
async {
|
||||||
tokio::io::copy(&mut input, &mut out).await?;
|
tokio::io::copy(&mut input, &mut out).await?;
|
||||||
out.flush().await
|
out.shutdown().await
|
||||||
}
|
}
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format_err!("compression of {path_str} failed: {e}"))?;
|
.map_err(|e| format_err!("compression of {path_str} failed: {e}"))?;
|
||||||
|
|
||||||
fs::remove_file(path)
|
fs::remove_file(path)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format_err!("remove {path_str} failed: {e}"))
|
.map_err(|e| format_err!("remove {path_str} failed: {e}"))
|
||||||
}
|
}
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -267,19 +356,22 @@ async fn compress(path: impl AsRef<Path>) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn parse_ts(ts: &str) -> std::result::Result<Timestamp, chrono::ParseError> {
|
pub fn parse_ts(ts: &str) -> std::result::Result<Timestamp, chrono::ParseError> {
|
||||||
let dt =
|
let format = &format!("{TS_FORMAT}%M%S");
|
||||||
chrono::NaiveDateTime::parse_from_str(&format!("{ts}0000"), &format!("{TS_FORMAT}%M%S"))?;
|
let full_ts = &format!("{ts}0000");
|
||||||
|
let dt = chrono::NaiveDateTime::parse_from_str(full_ts, format)?;
|
||||||
Ok(Timestamp::from_naive_utc_and_offset(dt, Utc))
|
Ok(Timestamp::from_naive_utc_and_offset(dt, Utc))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result<Vec<LogFile>> {
|
pub async fn log_files(log_path: &str, log_name: &str) -> fs::Result<Vec<LogFile>> {
|
||||||
let mut dir = PathBuf::from(log_path);
|
let mut dir = PathBuf::from(log_path);
|
||||||
dir.push("archives");
|
dir.push("archives");
|
||||||
|
|
||||||
let mut entries = Vec::new();
|
let mut entries = Vec::new();
|
||||||
let mut read_dir = fs::read_dir(dir).await?;
|
let mut read_dir = fs::read_dir(&dir).await?;
|
||||||
|
|
||||||
while let Some(entry) = read_dir.next_entry().await? {
|
while let Some(entry) =
|
||||||
|
(read_dir.next_entry().await).map_err(|e| fs::Error::ReadDir(dir.clone(), e))?
|
||||||
|
{
|
||||||
let file_name = entry.file_name();
|
let file_name = entry.file_name();
|
||||||
let Some(file_name) = file_name.to_str() else {
|
let Some(file_name) = file_name.to_str() else {
|
||||||
continue;
|
continue;
|
||||||
@@ -318,16 +410,27 @@ pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result<Vec<Lo
|
|||||||
Ok(entries)
|
Ok(entries)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, Ord)]
|
#[derive(Debug)]
|
||||||
pub struct LogFile {
|
pub struct LogFile {
|
||||||
pub path: PathBuf,
|
pub path: PathBuf,
|
||||||
pub compressed: bool,
|
pub compressed: bool,
|
||||||
pub timestamp: Timestamp,
|
pub timestamp: Timestamp,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Eq for LogFile {}
|
||||||
|
impl PartialEq for LogFile {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self.timestamp == other.timestamp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl Ord for LogFile {
|
||||||
|
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||||
|
self.timestamp.cmp(&other.timestamp)
|
||||||
|
}
|
||||||
|
}
|
||||||
impl PartialOrd for LogFile {
|
impl PartialOrd for LogFile {
|
||||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||||
self.timestamp.partial_cmp(&other.timestamp)
|
Some(self.cmp(other))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
136
src/proxy.rs
Normal file
136
src/proxy.rs
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
use log::{info, log_enabled, warn};
|
||||||
|
use std::convert::Infallible;
|
||||||
|
use std::io;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
|
||||||
|
use std::time::Duration;
|
||||||
|
use thiserror::Error;
|
||||||
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
|
use tokio::time;
|
||||||
|
|
||||||
|
pub struct Proxy {
|
||||||
|
pub listen_addrs: Vec<SocketAddr>,
|
||||||
|
pub targets: Vec<SocketAddr>,
|
||||||
|
pub poll: Duration,
|
||||||
|
pub timeout: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum Error {
|
||||||
|
#[error("failed to listen on {0}: {1}")]
|
||||||
|
ListenFailed(SocketAddr, std::io::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
|
impl Proxy {
|
||||||
|
pub async fn run(self) -> Result<Infallible> {
|
||||||
|
let mut listeners = Vec::with_capacity(self.listen_addrs.len());
|
||||||
|
for addr in self.listen_addrs {
|
||||||
|
listeners.push(
|
||||||
|
TcpListener::bind(&addr)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::ListenFailed(addr, e))?,
|
||||||
|
);
|
||||||
|
info!("listening on {addr}");
|
||||||
|
}
|
||||||
|
|
||||||
|
// all targets are initially ok (better land on a down one than just fail)
|
||||||
|
let targets: Vec<_> = (self.targets.into_iter())
|
||||||
|
.map(|addr| TargetStatus {
|
||||||
|
addr,
|
||||||
|
up: AtomicBool::new(true),
|
||||||
|
timeout: self.timeout,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// the proxy runs forever -> using 'static is not a leak
|
||||||
|
let targets = targets.leak();
|
||||||
|
|
||||||
|
for listener in listeners {
|
||||||
|
tokio::spawn(proxy_listener(listener, targets));
|
||||||
|
}
|
||||||
|
|
||||||
|
check_targets(targets, self.poll).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TargetStatus {
|
||||||
|
addr: SocketAddr,
|
||||||
|
up: AtomicBool,
|
||||||
|
timeout: Duration,
|
||||||
|
}
|
||||||
|
impl TargetStatus {
|
||||||
|
fn is_up(&self) -> bool {
|
||||||
|
self.up.load(Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_up(&self, is_up: bool) {
|
||||||
|
let prev = self.up.swap(is_up, Relaxed);
|
||||||
|
if prev != is_up {
|
||||||
|
if is_up {
|
||||||
|
info!("{} is up", self.addr);
|
||||||
|
} else {
|
||||||
|
warn!("{} is down", self.addr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connect(&self) -> io::Result<TcpStream> {
|
||||||
|
let r = match time::timeout(self.timeout, TcpStream::connect(self.addr)).await {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => Err(io::Error::new(io::ErrorKind::TimedOut, e)),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.set_up(r.is_ok());
|
||||||
|
r
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn check_targets(targets: &'static [TargetStatus], poll: Duration) -> ! {
|
||||||
|
use tokio::time;
|
||||||
|
let mut poll_ticker = time::interval(poll);
|
||||||
|
poll_ticker.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
poll_ticker.tick().await;
|
||||||
|
|
||||||
|
let mut tasks = tokio::task::JoinSet::new();
|
||||||
|
|
||||||
|
for target in targets {
|
||||||
|
tasks.spawn(target.connect());
|
||||||
|
}
|
||||||
|
|
||||||
|
tasks.join_all().await;
|
||||||
|
|
||||||
|
if log_enabled!(log::Level::Info) {
|
||||||
|
let mut infos = String::new();
|
||||||
|
for ts in targets.iter() {
|
||||||
|
infos.push_str(&format!("{} ", ts.addr));
|
||||||
|
infos.push_str(if ts.is_up() { "up " } else { "down " });
|
||||||
|
}
|
||||||
|
info!("{infos}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn proxy_listener(listener: TcpListener, targets: &'static [TargetStatus]) {
|
||||||
|
let mut rng = fastrand::Rng::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let mut active = Vec::with_capacity(targets.len());
|
||||||
|
let (mut src, _) = listener.accept().await.expect("listener.accept() failed");
|
||||||
|
|
||||||
|
active.extend((targets.iter().enumerate()).filter_map(|(i, ts)| ts.is_up().then_some(i)));
|
||||||
|
rng.shuffle(&mut active);
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
for i in active {
|
||||||
|
if let Ok(mut dst) = targets[i].connect().await {
|
||||||
|
let _ = tokio::io::copy_bidirectional(&mut src, &mut dst).await;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user