1 Commits

Author SHA1 Message Date
55884f92ea introduce rust 2025-06-23 02:03:02 +02:00
25 changed files with 1660 additions and 280 deletions

50
Cargo.lock generated
View File

@ -76,12 +76,6 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "autocfg"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
[[package]]
name = "backtrace"
version = "0.3.75"
@ -207,6 +201,7 @@ dependencies = [
"env_logger",
"eyre",
"itertools",
"libc",
"log",
"nix",
"regex",
@ -214,6 +209,7 @@ dependencies = [
"serde_json",
"serde_yaml",
"shell-escape",
"termios",
"tokio",
]
@ -240,9 +236,9 @@ checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "jiff"
version = "0.2.14"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a194df1107f33c79f4f93d02c80798520551949d59dfad22b6157048a88cca93"
checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49"
dependencies = [
"jiff-static",
"log",
@ -253,9 +249,9 @@ dependencies = [
[[package]]
name = "jiff-static"
version = "0.2.14"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c6e1db7ed32c6c71b759497fae34bf7933636f75a251b9e736555da426f6442"
checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4"
dependencies = [
"proc-macro2",
"quote",
@ -264,9 +260,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.172"
version = "0.2.173"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
checksum = "d8cfeafaffdbc32176b64fb251369d52ea9f0a8fbc6f8759edffef7b525d64bb"
[[package]]
name = "log"
@ -280,15 +276,6 @@ version = "2.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0"
[[package]]
name = "memoffset"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a"
dependencies = [
"autocfg",
]
[[package]]
name = "miniz_oxide"
version = "0.8.9"
@ -319,7 +306,6 @@ dependencies = [
"cfg-if",
"cfg_aliases",
"libc",
"memoffset",
]
[[package]]
@ -483,6 +469,16 @@ dependencies = [
"libc",
]
[[package]]
name = "socket2"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "syn"
version = "2.0.103"
@ -494,6 +490,15 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "termios"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "411c5bf740737c7918b8b1fe232dca4dc9f8e754b8ad5e20966814001ed0ac6b"
dependencies = [
"libc",
]
[[package]]
name = "tokio"
version = "1.45.1"
@ -506,6 +511,7 @@ dependencies = [
"mio",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.52.0",
]

View File

@ -8,16 +8,19 @@ strip = true
panic = "abort"
opt-level = "z"
lto = true
codegen-units = 1
[dependencies]
libc = { version = "0.2", default-features = false }
env_logger = "0.11.3"
eyre = "0.6.12"
itertools = "0.14.0"
log = "0.4.21"
nix = { version = "0.30.1", features = ["feature", "net", "reboot"] }
nix = { version = "0.30.1", features = ["feature", "mount", "process", "reboot"] }
regex = "1.11.1"
serde = { version = "1.0.198", features = ["derive"] }
serde_json = "1.0.116"
serde_yaml = "0.9.34"
shell-escape = "0.1.5"
tokio = { version = "1.38.0", features = ["rt", "signal", "process", "macros"] }
tokio = { version = "1.38.0", features = ["rt", "net", "fs", "process", "io-std", "io-util", "sync", "macros", "signal"] }
termios = "0.3.3"

View File

@ -16,12 +16,15 @@ workdir /layer
run . /etc/os-release \
&& wget -O- https://dl-cdn.alpinelinux.org/alpine/v${VERSION_ID%.*}/releases/x86_64/alpine-minirootfs-${VERSION_ID}-x86_64.tar.gz |tar zxv
run apk add --no-cache --update -p . musl coreutils lvm2 lvm2-extra lvm2-dmeventd udev cryptsetup \
e2fsprogs btrfs-progs lsblk openssh-server openssh-client \
&& rm -rf usr/share/apk var/cache/apk
run apk add --no-cache --update -p . musl coreutils \
lvm2 lvm2-extra lvm2-dmeventd udev cryptsetup \
e2fsprogs lsblk openssh-server \
&& rm -rf usr/share/apk var/cache/apk etc/motd
copy etc/sshd_config etc/ssh/sshd_config
copy --from=rust /init init
run cd bin && for cmd in init-version connect-boot bootstrap; do ln -s ../init $cmd; done
run cd bin && for cmd in init-version init-connect bootstrap; do ln -s ../init $cmd; done
# check viability
run chroot . init-version

7
etc/sshd_config Normal file
View File

@ -0,0 +1,7 @@
KbdInteractiveAuthentication no
PasswordAuthentication no
PubkeyAuthentication yes
PermitRootLogin yes
AllowUsers root

13
src/blockdev.rs Normal file
View File

@ -0,0 +1,13 @@
use tokio::fs;
use tokio::io::{AsyncReadExt, Result};
/// Checks if a block device is uninitialized.
/// It assumed that a initialized device always has a non-zero byte in the first 8kiB.
pub async fn is_uninitialized(dev_path: &str) -> Result<bool> {
let mut dev = fs::File::open(dev_path).await?;
let mut buf = [0u8; 8 << 10];
dev.read_exact(&mut buf).await?;
Ok(buf.into_iter().all(|b| b == 0))
}

View File

@ -1,3 +1,5 @@
pub const TAKE_ALL: i16 = -1;
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct Config {
pub anti_phishing_code: String,
@ -40,7 +42,7 @@ pub struct Network {
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct NetworkInterface {
pub var: String,
pub n: usize,
pub n: i16,
pub regexps: Vec<String>,
}
@ -91,68 +93,117 @@ impl Default for SSHKeys {
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct LvmVG {
vg: String,
pvs: LvmPV,
#[serde(alias = "vg")]
pub name: String,
pub pvs: LvmPV,
#[serde(default)]
defaults: LvmLVDefaults,
pub defaults: LvmLVDefaults,
lvs: Vec<LvmLV>,
pub lvs: Vec<LvmLV>,
}
#[derive(Debug, Default, serde::Deserialize, serde::Serialize)]
pub struct LvmLVDefaults {
#[serde(default)]
pub fs: Filesystem,
#[serde(default)]
pub raid: Raid,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct LvmLVDefaults {
#[serde(default = "default_fs")]
fs: String,
raid: Option<Raid>,
#[serde(rename_all = "snake_case")]
pub enum Filesystem {
Ext4,
Xfs,
Btrfs,
Other(String),
}
impl Default for LvmLVDefaults {
fn default() -> Self {
Self {
fs: default_fs(),
raid: None,
impl Filesystem {
pub fn fstype(&self) -> &str {
use Filesystem as F;
match self {
F::Ext4 => "ext4",
F::Xfs => "xfs",
F::Btrfs => "btrfs",
F::Other(t) => t,
}
}
}
fn default_fs() -> String {
"ext4".to_string()
impl Default for Filesystem {
fn default() -> Self {
Filesystem::Ext4
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct LvmLV {
name: String,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
fs: Option<String>,
pub fs: Option<Filesystem>,
#[serde(skip_serializing_if = "Option::is_none")]
raid: Option<Raid>,
#[serde(skip_serializing_if = "Option::is_none")]
size: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
extents: Option<String>,
pub raid: Option<Raid>,
#[serde(flatten)]
pub size: LvSize,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub enum LvSize {
Size(String),
Extents(String),
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct LvmPV {
n: i16,
regexps: Vec<String>,
pub n: i16,
pub regexps: Vec<String>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct CryptDev {
name: String,
dev: Option<String>,
prefix: Option<String>,
pub name: String,
#[serde(flatten)]
pub filter: DevFilter,
pub optional: Option<bool>,
}
impl CryptDev {
pub fn optional(&self) -> bool {
self.optional.unwrap_or_else(|| self.filter.is_prefix())
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub enum DevFilter {
Dev(String),
Prefix(String),
}
impl DevFilter {
pub fn is_dev(&self) -> bool {
match self {
Self::Dev(_) => true,
_ => false,
}
}
pub fn is_prefix(&self) -> bool {
match self {
Self::Prefix(_) => true,
_ => false,
}
}
}
#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
pub struct Raid {
mirrors: Option<u8>,
stripes: Option<u8>,
pub mirrors: Option<u8>,
pub stripes: Option<u8>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct Bootstrap {
dev: String,
seed: String,
pub dev: String,
pub seed: Option<String>,
}

View File

@ -1,4 +1,5 @@
pub mod bootstrap;
pub mod connect_boot;
pub mod init;
pub mod init_input;
pub mod version;

View File

@ -1,195 +1,228 @@
use eyre::{format_err, Result};
use log::{error, info, warn};
use std::collections::BTreeSet as Set;
use std::os::unix::fs::symlink;
use std::{fs, io, process::Command};
use tokio::sync::Mutex;
use tokio::{fs, process::Command};
use crate::{bootstrap::config::Config, cmd::version::version_string, lsblk, lvm};
use crate::{bootstrap::config::Config, cmd::version::version_string, dklog, input};
mod bootstrap;
mod dmcrypt;
mod lvm;
mod networks;
mod sshd;
pub fn run() -> Result<()> {
// devices used by any block target (DM, LVM...)
static USED_DEVS: Mutex<Set<String>> = Mutex::const_new(Set::new());
pub async fn run() {
if std::process::id() != 1 {
return Err(format_err!(
"init must run as PID 1, not {}",
std::process::id()
));
error!("init must run as PID 1, not {}", std::process::id());
std::process::exit(1);
}
unsafe {
use std::env;
env::set_var("PATH", "/bin:/sbin:/usr/bin:/usr/sbin");
env::set_var("HOME", "/root");
}
info!("Welcome to {}", version_string());
let uname = nix::sys::utsname::uname()?;
let uname = nix::sys::utsname::uname().expect("uname should work");
let kernel_version = uname.release().to_string_lossy();
info!("Linux version {kernel_version}");
let cfg: Config = retry(|| {
let cfg = fs::read("config.yaml").map_err(|e| format_err!("failed to read config: {e}"))?;
let cfg: Config = retry(async || {
let cfg = (fs::read("config.yaml").await)
.map_err(|e| format_err!("failed to read config: {e}"))?;
serde_yaml::from_slice(cfg.as_slice())
.map_err(|e| format_err!("failed to parse config: {e}"))
});
})
.await;
info!("config loaded");
// XXX temporary
serde_yaml::to_writer(io::stdout(), &cfg).unwrap();
info!("anti-phishing-code: {}", cfg.anti_phishing_code);
// seem to occasionaly race with Child::wait
// tokio::spawn(child_reaper());
// open input channels
tokio::spawn(crate::input::answer_requests_from_stdin());
tokio::spawn(crate::input::answer_requests_from_socket());
// mount basic filesystems
mount("none", "/proc", "proc", None);
mount("none", "/sys", "sysfs", None);
mount("none", "/dev", "devtmpfs", None);
mount("none", "/dev/pts", "devpts", Some("gid=5,mode=620"));
mount("none", "/proc", "proc", None).await;
mount("none", "/sys", "sysfs", None).await;
mount("none", "/dev", "devtmpfs", None).await;
mount("none", "/dev/pts", "devpts", Some("gid=5,mode=620")).await;
// mount modules
if let Some(ref modules) = cfg.modules {
mount(modules, "/modules", "squashfs", None);
retry_or_ignore(async || {
info!("mounting modules");
mount(modules, "/modules", "squashfs", None).await;
fs::create_dir_all("/lib/modules")?;
fs::create_dir_all("/lib/modules").await?;
let modules_path = &format!("/modules/lib/modules/{kernel_version}");
if !fs::exists(modules_path)? {
let e = format_err!("invalid modules package: {modules_path} should exist");
error!("{e}");
return Err(e);
if !std::fs::exists(modules_path)? {
return Err(format_err!(
"invalid modules package: {modules_path} should exist"
));
}
symlink(modules_path, format!("/lib/modules/{kernel_version}"))?;
Ok(())
})
.await;
} else {
warn!("modules NOT mounted (not configured)");
}
// init devices
info!("initializing devices");
start_daemon("udevd", &[]);
start_daemon("udevd", &[]).await;
exec("udevadm", &["trigger", "-c", "add", "-t", "devices"]);
exec("udevadm", &["trigger", "-c", "add", "-t", "subsystems"]);
exec("udevadm", &["settle"]);
// networks
networks::setup(&cfg)?;
exec("udevadm", &["trigger", "-c", "add", "-t", "devices"]).await;
exec("udevadm", &["trigger", "-c", "add", "-t", "subsystems"]).await;
exec("udevadm", &["settle"]).await;
// Wireguard VPN
// TODO startVPN()
// SSH service
sshd::start(&cfg);
sshd::start(&cfg).await;
// dmcrypt blockdevs
// TODO setupCrypt(cfg.PreLVMCrypt, map[string]string{});
// networks
networks::setup(&cfg).await;
// pre-lvm dmcrypt devs
dmcrypt::setup(&cfg.pre_lvm_crypt).await;
// LVM
// TODO setupLVM(cfg);
lvm::setup(&cfg).await;
// post-lvm dmcrypt devs
dmcrypt::setup(&cfg.crypt).await;
// bootstrap the system
// TODO bootstrap(cfg);
bootstrap::bootstrap(cfg).await;
// finalize
// TODO finalizeBoot();
retry(async || switch_root("/system").await).await;
}
exec_shell();
async fn switch_root(root: &str) -> Result<()> {
use std::ffi::CString;
macro_rules! c {
($s:expr) => {
CString::new($s)?.as_c_str()
};
}
info!("switching root");
dklog::LOG.close().await;
nix::unistd::execv(
c!("/sbin/switch_root"),
&[
c!("switch_root"),
c!("-c"),
c!("/dev/console"),
c!(root),
c!("/sbin/init"),
],
)?;
Ok(())
}
fn mount(src: &str, dst: &str, fstype: &str, opts: Option<&str>) {
if let Err(e) = fs::create_dir_all(dst) {
async fn mount(src: &str, dst: &str, fstype: &str, opts: Option<&str>) {
if let Err(e) = fs::create_dir_all(dst).await {
error!("failed to create dir {dst}: {e}");
}
let mut args = vec![src, dst, "-t", fstype];
match fstype {
"ext4" => {
exec("fsck", &["-p", src]).await;
}
_ => {}
}
let mut args = vec![src, dst];
if !fstype.is_empty() {
args.extend(&["-t", fstype]);
}
if let Some(opts) = opts {
args.extend(["-o", opts]);
}
exec("mount", &args);
exec("mount", &args).await;
}
fn start_daemon(prog: &str, args: &[&str]) {
let cmd_str = sh_str(prog, args);
retry_or_ignore(|| {
async fn start_daemon(prog: &str, args: &[&str]) {
let (cmd_str, mut cmd) = cmd_str(prog, args);
retry_or_ignore(async || {
info!("starting as daemon: {cmd_str}");
let mut cmd = Command::new(prog);
cmd.args(args);
cmd.spawn()?;
Ok(())
});
})
.await;
}
fn exec(prog: &str, args: &[&str]) {
let cmd_str = sh_str(prog, args);
retry_or_ignore(|| {
async fn try_exec(prog: &str, args: &[&str]) -> Result<()> {
let (cmd_str, mut cmd) = cmd_str(prog, args);
info!("# {cmd_str}");
let mut cmd = Command::new(prog);
cmd.args(args);
let s = cmd.status()?;
let s = cmd.status().await?;
if s.success() {
Ok(())
} else {
Err(format_err!("command failed: {s}"))
}
});
}
fn retry_or_ignore(mut action: impl FnMut() -> Result<()>) {
async fn exec(prog: &str, args: &[&str]) {
retry_or_ignore(async || try_exec(prog, args).await).await;
}
async fn retry_or_ignore(mut action: impl AsyncFnMut() -> Result<()>) {
loop {
match action() {
match action().await {
Ok(_) => return,
Err(e) => {
error!("{e}");
loop {
eprint!("[r]etry, [i]gnore, or [s]hell? ");
let mut line = String::new();
io::stdin().read_line(&mut line).unwrap();
match line.trim() {
"r" => break,
"i" => return,
"s" => {
exec_shell();
break;
}
v => {
eprintln!("invalid choice: {v:?}");
}
};
match input::read_choice(["[r]etry", "[i]gnore", "[s]hell"]).await {
'r' => {}
'i' => return,
's' => exec_shell().await,
_ => unreachable!(),
}
}
}
}
}
fn retry<T>(mut action: impl FnMut() -> Result<T>) -> T {
async fn retry<T>(mut action: impl AsyncFnMut() -> Result<T>) -> T {
loop {
match action() {
match action().await {
Ok(v) => return v,
Err(e) => {
error!("{e}");
loop {
eprint!("[r]etry, or [s]hell? ");
let mut line = String::new();
io::stdin().read_line(&mut line).unwrap();
match line.trim() {
"r" => break,
"s" => {
exec_shell();
break;
}
v => {
eprintln!("invalid choice: {v:?}");
}
};
match input::read_choice(["[r]etry", "[s]hell"]).await {
'r' => {}
's' => exec_shell().await,
_ => unreachable!(),
}
}
}
}
}
fn exec_shell() {
async fn exec_shell() {
let mut child = match Command::new("ash").spawn() {
Ok(c) => c,
Err(e) => {
@ -198,10 +231,10 @@ fn exec_shell() {
}
};
let _ = child.wait();
let _ = child.wait().await;
}
fn sh_str(prog: &str, args: &[&str]) -> String {
fn cmd_str(prog: &str, args: &[&str]) -> (String, Command) {
use std::borrow::Cow;
let mut buf = String::new();
@ -213,5 +246,26 @@ fn sh_str(prog: &str, args: &[&str]) -> String {
buf.push_str(&shell_escape::escape(Cow::Borrowed(arg)));
}
buf
let mut cmd = Command::new(prog);
cmd.args(args);
(buf, cmd)
}
#[allow(unused)]
async fn child_reaper() {
use nix::sys::wait::{waitpid, WaitPidFlag};
use nix::unistd::Pid;
use tokio::signal::unix::{signal, SignalKind};
let Ok(mut sigs) =
signal(SignalKind::child()).inspect_err(|e| warn!("failed to setup SIGCHLD handler: {e}"))
else {
return;
};
loop {
sigs.recv().await;
let _ = waitpid(Some(Pid::from_raw(-1)), Some(WaitPidFlag::WNOHANG));
}
}

310
src/cmd/init/bootstrap.rs Normal file
View File

@ -0,0 +1,310 @@
use eyre::{format_err, Result};
use log::{info, warn};
use std::path::Path;
use tokio::{
fs,
io::{AsyncBufReadExt, BufReader},
};
use super::{exec, mount, retry, retry_or_ignore, try_exec};
use crate::bootstrap::config::Config;
use crate::dkl;
pub async fn bootstrap(cfg: Config) {
let bs = cfg.bootstrap;
retry_or_ignore(async || {
fs::create_dir_all("/boostrap").await?;
mount(&bs.dev, "/bootstrap", "auto", None).await;
Ok(())
})
.await;
let boot_version = "current"; // FIXME need to parse kernel cmdline
let base_dir = &format!("/bootstrap/{boot_version}");
retry_or_ignore(async || {
if !fs::try_exists(&base_dir).await? {
info!("creating {base_dir}");
fs::create_dir_all(&base_dir).await?
}
Ok(())
})
.await;
let sys_cfg: dkl::Config = retry(async || {
let sys_cfg_bytes = seed_config(base_dir, &bs.seed).await?;
Ok(serde_yaml::from_slice(&sys_cfg_bytes)?)
})
.await;
mount_system(&sys_cfg, base_dir).await;
retry_or_ignore(async || apply_files(&sys_cfg.files, "/system").await).await;
apply_groups(&sys_cfg.groups, "/system").await;
apply_users(&sys_cfg.users, "/system").await;
// TODO VPNs
mount_filesystems(&sys_cfg.mounts, "/system").await;
retry_or_ignore(async || {
info!("setting up root user");
setup_root_user(&sys_cfg.root_user, "/system").await
})
.await;
}
async fn seed_config(base_dir: &str, seed_url: &Option<String>) -> Result<Vec<u8>> {
let cfg_path = &format!("{base_dir}/config.yaml");
if fs::try_exists(cfg_path).await? {
return Ok(fs::read(cfg_path).await?);
}
let bs_tar = "/bootstrap.tar";
if !fs::try_exists(bs_tar).await? {
if let Some(seed_url) = seed_url.as_ref() {
fetch_bootstrap(seed_url, bs_tar).await?;
} else {
return Err(format_err!(
"no {cfg_path}, no {bs_tar} and no seed, can't bootstrap"
));
}
}
try_exec("tar", &["xf", bs_tar, "-C", base_dir]).await?;
if !fs::try_exists(cfg_path).await? {
return Err(format_err!("{cfg_path} does not exist after seeding"));
}
Ok(fs::read(cfg_path).await?)
}
async fn fetch_bootstrap(seed_url: &str, output_file: &str) -> Result<()> {
let tmp_file = &format!("{output_file}.new");
let _ = fs::remove_file(tmp_file).await;
try_exec("wget", &["-O", tmp_file, seed_url]).await?;
fs::rename(tmp_file, output_file)
.await
.map_err(|e| format_err!("seed rename failed: {e}"))?;
Ok(())
}
async fn mount_system(cfg: &dkl::Config, bs_dir: &str) {
let mem_dir = "/mem";
mount("none", mem_dir, "tmpfs", Some("size=512m")).await;
let layers_dir = &format!("{mem_dir}/layers");
let mut lower_dir = String::new();
for layer in &cfg.layers {
let src = if layer == "modules" {
"/modules.sqfs"
} else {
&format!("{bs_dir}/{layer}.fs")
};
let tgt = &format!("{mem_dir}/{layer}.fs");
retry(async || {
info!("copying layer {layer} from {src}");
fs::copy(src, tgt).await?;
Ok(())
})
.await;
let layer_dir = &format!("{layers_dir}/{layer}");
mount(tgt, layer_dir, "squashfs", None).await;
if !lower_dir.is_empty() {
lower_dir.push(':');
}
lower_dir.push_str(&layer_dir);
}
let upper_dir = &format!("{mem_dir}/upper");
let work_dir = &format!("{mem_dir}/work");
retry_or_ignore(async || {
fs::create_dir_all(upper_dir).await?;
fs::create_dir_all(work_dir).await?;
Ok(())
})
.await;
mount(
"none",
"/system",
"overlay",
Some(&format!(
"lowerdir={lower_dir},upperdir={upper_dir},workdir={work_dir}"
)),
)
.await;
// make root rshared (default in systemd, required by Kubernetes 1.10+)
// equivalent to "mount --make-rshared /"
// see kernel's Documentation/sharedsubtree.txt (search rshared)
retry_or_ignore(async || {
use nix::mount::MsFlags as M;
const NONE: Option<&str> = None;
nix::mount::mount(NONE, "/system", NONE, M::MS_SHARED | M::MS_REC, NONE)?;
Ok(())
})
.await;
}
fn chroot(root: &str, path: &str) -> String {
format!("{root}/{}", path.trim_start_matches(|c| c == '/'))
}
async fn apply_files(files: &[dkl::File], root: &str) -> Result<()> {
for file in files {
let path = chroot(root, &file.path);
let path = Path::new(&path);
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
use crate::dkl::FileKind as K;
match &file.kind {
K::Content(content) => fs::write(path, content.as_bytes()).await?,
K::Dir(true) => fs::create_dir(path).await?,
K::Dir(false) => {} // shouldn't happen, but semantic is to ignore
K::Symlink(tgt) => fs::symlink(tgt, path).await?,
}
match file.kind {
K::Symlink(_) => warn!("{}: can set perms on symlink", file.path),
_ => set_perms(path, file.mode).await?,
}
info!("created {}", file.path);
}
Ok(())
}
async fn set_perms(path: impl AsRef<Path>, mode: Option<u32>) -> std::io::Result<()> {
if let Some(mode) = mode.filter(|m| *m != 0) {
use std::os::unix::fs::PermissionsExt;
let mode = std::fs::Permissions::from_mode(mode);
fs::set_permissions(path, mode).await?;
}
Ok(())
}
async fn apply_groups(groups: &[dkl::Group], root: &str) {
for group in groups {
let mut args = vec![root, "groupadd", "-r"];
let gid = group.gid.map(|s| s.to_string());
if let Some(gid) = gid.as_ref() {
args.extend(&["-g", gid]);
}
args.push(group.name.as_str());
exec("chroot", &args).await;
}
}
async fn apply_users(users: &[dkl::User], root: &str) {
for user in users {
let mut args = vec![root, "useradd", "-r"];
let uid = user.uid.map(|s| s.to_string());
if let Some(uid) = uid.as_ref() {
args.extend(&["-u", uid]);
}
let gid = user.gid.map(|s| s.to_string());
if let Some(gid) = gid.as_ref() {
args.extend(&["-g", gid]);
}
args.push(user.name.as_str());
exec("chroot", &args).await;
}
}
async fn mount_filesystems(mounts: &[dkl::Mount], root: &str) {
for m in mounts {
let path = chroot(root, &m.path);
mount(
&m.dev,
&path,
m.r#type.as_ref().map_or("", |v| v.as_str()),
m.options
.as_ref()
.filter(|v| !v.is_empty())
.map(|s| s.as_str()),
)
.await;
}
}
async fn setup_root_user(user: &dkl::RootUser, root: &str) -> Result<()> {
if let Some(pw_hash) = user.password_hash.as_ref().filter(|v| !v.is_empty()) {
set_user_password("root", &pw_hash, root).await?;
}
let mut authorized_keys = Vec::new();
for ak in &user.authorized_keys {
authorized_keys.extend(ak.as_bytes());
authorized_keys.push(b'\n');
}
let ssh_dir = &chroot(root, "root/.ssh");
fs::create_dir_all(ssh_dir)
.await
.map_err(|e| format_err!("mkdir -p {ssh_dir} failed: {e}"))?;
set_perms(ssh_dir, Some(0o700))
.await
.map_err(|e| format_err!("chmod {ssh_dir} failed: {e}"))?;
let ak_path = &format!("{ssh_dir}/authorized_keys");
fs::write(ak_path, authorized_keys)
.await
.map_err(|e| format_err!("write {ak_path} failed: {e}"))?;
Ok(())
}
async fn set_user_password(user: &str, password_hash: &str, root: &str) -> Result<()> {
info!("setting password for {user}");
let user = user.as_bytes();
let password_hash = password_hash.as_bytes();
let mut buf = Vec::new();
let pw_file = &chroot(root, "etc/shadow");
let rd = fs::File::open(pw_file)
.await
.map_err(|e| format_err!("open {pw_file} failed: {e}"))?;
let mut rd = BufReader::new(rd);
let mut line = Vec::new();
while (rd.read_until(b'\n', &mut line).await)
.map_err(|e| format_err!("read {pw_file} failed: {e}"))?
!= 0
{
let mut split: Vec<_> = line.split(|c| *c == b':').collect();
if split.len() > 2 && split[0] == user {
split[1] = password_hash;
buf.extend(split.join(&b':'));
} else {
buf.extend(&line);
}
line.clear();
}
fs::write(pw_file, buf).await?;
Ok(())
}

142
src/cmd/init/dmcrypt.rs Normal file
View File

@ -0,0 +1,142 @@
use eyre::{format_err, Result};
use log::{error, info};
use std::collections::BTreeSet as Set;
use std::process::Stdio;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tokio::sync::Mutex;
use super::{retry_or_ignore, USED_DEVS};
use crate::blockdev::is_uninitialized;
use crate::bootstrap::config::{CryptDev, DevFilter};
use crate::fs::walk_dir;
use crate::input;
pub async fn setup(devs: &[CryptDev]) {
if devs.is_empty() {
return;
}
let mut used_devs = USED_DEVS.lock().await;
// CryptDev.name that have a least one assignment done
let mut done = Set::new();
// dmcrypt devices opened here
let mut done_crypt = Set::new();
retry_or_ignore(async || {
let all_devs = walk_dir("/dev").await;
for dev in devs {
let mut mappings = find_dev(dev, &all_devs);
mappings.retain(|(_, dev_path)| !used_devs.contains(dev_path));
if mappings.is_empty() && !dev.optional() && !done.contains(&dev.name) {
return Err(format_err!("no device found for crypt dev {}", dev.name));
}
for (crypt_dev, dev_path) in mappings {
if done_crypt.contains(&crypt_dev) {
continue;
}
info!("crypt dev {crypt_dev}: using {dev_path}");
crypt_open(&crypt_dev, &dev_path).await?;
done_crypt.insert(crypt_dev);
used_devs.insert(dev_path);
done.insert(&dev.name);
}
}
Ok(())
})
.await;
}
static PREV_PW: Mutex<String> = Mutex::const_new(String::new());
async fn crypt_open(crypt_dev: &str, dev_path: &str) -> Result<()> {
'open_loop: loop {
let mut prev_pw = PREV_PW.lock().await;
let prompt = if prev_pw.is_empty() {
format!("crypt password for {crypt_dev}? ")
} else {
format!("crypt password for {crypt_dev} (enter = reuse previous)? ")
};
let mut pw = input::read_password(prompt).await;
if pw.is_empty() {
pw = prev_pw.clone();
}
if pw.is_empty() {
error!("empty password provided!");
continue;
}
*prev_pw = pw.clone();
if cryptsetup(&pw, ["open", dev_path, crypt_dev]).await? {
return Ok(());
}
error!("crypt open {crypt_dev} from {dev_path} failed");
if is_uninitialized(dev_path).await? {
// we can format the device
match input::read_choice(["[f]ormat", "[r]etry", "[i]gnore"]).await {
'r' => continue 'open_loop,
'i' => return Ok(()),
'f' => {
if !cryptsetup(&pw, ["luksFormat", dev_path]).await? {
return Err(format_err!("cryptsetup luksFormat failed"));
}
if !cryptsetup(&pw, ["open", dev_path, crypt_dev]).await? {
return Err(format_err!("open after format failed"));
}
return Ok(());
}
_ => unreachable!(),
}
} else {
// device looks initialized, don't allow format
match input::read_choice(["[r]etry", "[i]gnore"]).await {
'r' => continue 'open_loop,
'i' => return Ok(()),
_ => unreachable!(),
}
}
}
}
async fn cryptsetup<const N: usize>(pw: &str, args: [&str; N]) -> Result<bool> {
let mut child = Command::new("cryptsetup")
.args(args)
.arg("--key-file=-")
.stdin(Stdio::piped())
.spawn()?;
(child.stdin.as_mut().unwrap())
.write_all(pw.as_bytes())
.await?;
Ok(child.wait().await?.success())
}
fn find_dev(dev: &CryptDev, all_devs: &[String]) -> Vec<(String, String)> {
let dev_name = &dev.name;
match dev.filter {
DevFilter::Dev(ref path) => (all_devs.iter())
.filter(|dev_path| dev_path == &path)
.map(|dev_path| (dev.name.clone(), dev_path.clone()))
.collect(),
DevFilter::Prefix(ref prefix) => (all_devs.iter())
.filter_map(|path| {
let suffix = path.strip_prefix(prefix)?;
Some((format!("{dev_name}{suffix}"), path.clone()))
})
.collect(),
}
}

208
src/cmd/init/lvm.rs Normal file
View File

@ -0,0 +1,208 @@
use eyre::{format_err, Result};
use log::{error, info};
use tokio::process::Command;
use super::{exec, retry, retry_or_ignore, USED_DEVS};
use crate::bootstrap::config::{Config, Filesystem, LvSize, LvmLV, LvmVG, TAKE_ALL};
use crate::fs::walk_dir;
use crate::{blockdev, lvm};
pub async fn setup(cfg: &Config) {
if cfg.lvm.is_empty() {
info!("no LVM VG configured");
return;
}
exec("pvscan", &[]).await;
exec("vgscan", &["--mknodes"]).await;
for vg in &cfg.lvm {
retry_or_ignore(async || setup_vg(vg).await).await
}
let lvs = retry(lvm::lvs).await;
for vg in &cfg.lvm {
let vg_name = vg.name.as_str();
for lv in &vg.lvs {
let lv_name = lv.name.as_str();
if (lvs.iter()).any(|lv| lv.equal_name(vg_name, lv_name)) {
info!("LVM LV {vg_name}/{lv_name} exists");
} else {
retry_or_ignore(async || setup_lv(&vg, &lv).await).await;
}
}
}
exec("vgchange", &["--sysinit", "-a", "ly"]).await;
for vg in &cfg.lvm {
for lv in &vg.lvs {
retry_or_ignore(async || format_lv(&vg, &lv).await).await;
}
}
}
async fn setup_vg(vg: &LvmVG) -> Result<()> {
let vg_name = vg.name.as_str();
let pvs = retry(lvm::pvs).await;
let mut dev_done = pvs.iter().filter(|pv| pv.vg_name == vg.name).count();
let dev_needed = vg.pvs.n;
macro_rules! missing_count {
() => {
(dev_needed as usize) - dev_done
};
}
if dev_needed == TAKE_ALL {
if dev_done == 0 {
info!("setting up LVM VG {vg_name} using all matching devices");
} else {
// in "take all" mode, don't extend as existing vg at boot
info!("LVM VG {vg_name} exists");
return Ok(());
}
} else if dev_done >= (dev_needed as usize) {
info!("LVM VG {vg_name} exists with enough devices");
return Ok(()); // already set up
} else {
info!("setting up LVM VG {vg_name} ({dev_done}/{dev_needed} devices configured)");
}
let regexps: Vec<regex::Regex> = (vg.pvs.regexps.iter())
.filter_map(|re_str| {
(re_str.parse())
.inspect_err(|e| error!("invalid regex ignored: {re_str:?}: {e}"))
.ok()
})
.collect();
let mut used_devs = USED_DEVS.lock().await;
let matching_devs = (walk_dir("/dev").await.into_iter())
.filter(|path| !used_devs.contains(path.as_str()))
.filter(|path| regexps.iter().any(|re| re.is_match(path)));
let devs: Vec<_> = if dev_needed == TAKE_ALL {
matching_devs.collect()
} else {
matching_devs.take(missing_count!()).collect()
};
let cmd = if dev_done == 0 {
"vgcreate"
} else {
"vgextend"
};
let status = (Command::new(cmd).arg(vg_name).args(&devs))
.status()
.await?;
if !status.success() {
return Err(format_err!("{cmd} failed: {status}"));
}
dev_done += devs.len();
used_devs.extend(devs);
if dev_needed != TAKE_ALL && dev_done < (dev_needed as usize) {
return Err(format_err!(
"LVM VG {vg_name} needs {} more device(s)",
missing_count!()
));
}
Ok(())
}
async fn setup_lv(vg: &LvmVG, lv: &LvmLV) -> Result<()> {
let name = format!("{}/{}", vg.name, lv.name);
info!("creating LV {name}");
let mut cmd = Command::new("lvcreate");
cmd.arg(&vg.name);
cmd.args(&["--name", &lv.name]);
match &lv.size {
LvSize::Size(sz) => cmd.args(&["-L", sz]),
LvSize::Extents(sz) => cmd.args(&["-l", sz]),
};
let raid = lv.raid.as_ref().unwrap_or(&vg.defaults.raid);
if let Some(mirrors) = raid.mirrors {
cmd.args(&["--mirrors", &mirrors.to_string()]);
}
if let Some(stripes) = raid.stripes {
cmd.args(&["--stripes", &stripes.to_string()]);
}
let status = cmd.status().await?;
if !status.success() {
return Err(format_err!("lvcreate failed: {status}"));
}
Ok(())
}
async fn format_lv(vg: &LvmVG, lv: &LvmLV) -> Result<()> {
let name = &format!("{}/{}", vg.name, lv.name);
let dev = &format!("/dev/{name}");
if !blockdev::is_uninitialized(&dev).await? {
info!("{dev} looks initialized");
return Ok(());
}
let fs = lv.fs.as_ref().unwrap_or(&vg.defaults.fs);
info!("initializing {} filesystem on {dev}", fs.fstype());
let mkfs = format!("mkfs.{}", fs.fstype());
let mut cmd = Command::new(&mkfs);
// filesystem specific flags
match fs {
Filesystem::Ext4 => {
cmd.arg("-F");
}
Filesystem::Btrfs | Filesystem::Xfs => {
cmd.arg("-f");
}
&Filesystem::Other(_) => {}
}
cmd.arg(dev);
let mut child = match cmd.spawn() {
Ok(v) => v,
Err(e) => {
// try simple fixes
match fs {
Filesystem::Xfs => install_package("xfsprogs").await?,
Filesystem::Btrfs => install_package("btrs-progs").await?,
_ => Err(format_err!("{mkfs} failed: {e}"))?,
}
cmd.spawn().map_err(|e| format_err!("{mkfs} failed: {e}"))?
}
};
let status = child.wait().await?;
if !status.success() {
return Err(format_err!("{mkfs} failed: {status}"));
}
Ok(())
}
async fn install_package(pkg: &str) -> Result<()> {
let status = Command::new("apk").arg("add").arg(pkg).status().await?;
if status.success() {
Ok(())
} else {
Err(format_err!("failed to install package {pkg}: {status}"))
}
}

View File

@ -1,24 +1,29 @@
use itertools::Itertools;
use log::{info, warn};
use std::collections::BTreeSet as Set;
use std::process::Command;
use tokio::process::Command;
use super::{format_err, retry_or_ignore, Config, Result};
use crate::{
bootstrap::config,
udev,
utils::{select_n_by_regex, NameAliases},
};
pub fn setup(cfg: &Config) -> Result<()> {
pub async fn setup(cfg: &Config) {
if cfg.networks.is_empty() {
warn!("no networks configured");
return Ok(());
return;
}
let mut assigned: Set<String> = Set::new();
let mut assigned = Set::new();
for net in &cfg.networks {
retry_or_ignore(|| {
retry_or_ignore(async || setup_network(net, &mut assigned).await).await;
}
}
async fn setup_network(net: &config::Network, assigned: &mut Set<String>) -> Result<()> {
info!("setting up network {}", net.name);
let netdevs = get_interfaces()?
@ -57,16 +62,12 @@ pub fn setup(cfg: &Config) -> Result<()> {
}
info!("- running script");
let status = cmd.status()?;
let status = cmd.status().await?;
if !status.success() {
return Err(format_err!("setup script failed: {status}"));
}
assigned.extend(selected);
Ok(())
});
}
Ok(())
}

View File

@ -1,15 +1,15 @@
use log::{info, warn};
use std::fs;
use std::io::Write;
use std::net;
use std::os::unix::fs::PermissionsExt;
use std::process::{Command, Stdio};
use std::thread;
use std::process::Stdio;
use tokio::net;
use tokio::process::Command;
use super::{format_err, retry_or_ignore, Config};
use super::{retry_or_ignore, Config};
pub fn start(cfg: &Config) {
retry_or_ignore(|| {
pub async fn start(cfg: &Config) {
retry_or_ignore(async || {
info!("ssh: writing authorized keys");
let ssh_dir = "/root/.ssh";
@ -26,9 +26,10 @@ pub fn start(cfg: &Config) {
fs::write(authorized_keys, ak)?;
Ok(())
});
})
.await;
retry_or_ignore(|| {
retry_or_ignore(async || {
let mut sshd_args = Vec::new();
sshd_args.extend(["-i", "-E", "/var/log/sshd.log"]);
@ -43,23 +44,27 @@ pub fn start(cfg: &Config) {
let sshd_args = sshd_args.into_iter().map(String::from).collect();
// don't pre-start ssd as it should rarely be useful at this stage, use inetd-style.
// don't pre-start sshd as it should rarely be useful at this stage, use inetd-style.
let listen_addr = cfg.ssh.listen.clone();
info!("ssh: starting listener on {listen_addr}");
let listener = net::TcpListener::bind(listen_addr)?;
let listener = net::TcpListener::bind(listen_addr).await?;
thread::spawn(move || handle_ssh_connections(listener, sshd_args));
tokio::spawn(handle_ssh_connections(listener, sshd_args));
Ok(())
});
})
.await;
}
fn handle_ssh_connections(listener: net::TcpListener, sshd_args: Vec<String>) {
async fn handle_ssh_connections(listener: net::TcpListener, sshd_args: Vec<String>) {
loop {
let Ok((stream, remote)) = listener.accept() else {
warn!("ssh: listener stopped");
break;
let (stream, remote) = match listener.accept().await {
Ok(v) => v,
Err(e) => {
warn!("ssh: listener stopped: {e}");
return;
}
};
info!("ssh: new connection from {remote}");
@ -74,8 +79,13 @@ fn handle_ssh_connections(listener: net::TcpListener, sshd_args: Vec<String>) {
cmd.stdout(unsafe { Stdio::from_raw_fd(fd) });
cmd.stderr(Stdio::null());
if let Err(e) = cmd.spawn() {
match cmd.spawn() {
Ok(mut child) => {
tokio::spawn(async move { child.wait().await });
}
Err(e) => {
warn!("ssh: failed to start server: {e}");
};
}
}
}
}

11
src/cmd/init_input.rs Normal file
View File

@ -0,0 +1,11 @@
use crate::input;
pub async fn run() {
tokio::spawn(async {
if let Err(e) = input::forward_requests_from_socket().await {
eprintln!("failed to forwards requests from socket: {e}");
std::process::exit(1);
}
});
input::answer_requests_from_stdin().await;
}

57
src/dkl.rs Normal file
View File

@ -0,0 +1,57 @@
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct Config {
pub layers: Vec<String>,
pub root_user: RootUser,
pub mounts: Vec<Mount>,
pub files: Vec<File>,
pub groups: Vec<Group>,
pub users: Vec<User>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct RootUser {
#[serde(skip_serializing_if = "Option::is_none")]
pub password_hash: Option<String>,
pub authorized_keys: Vec<String>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct Mount {
pub r#type: Option<String>,
pub dev: String,
pub path: String,
pub options: Option<String>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct Group {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub gid: Option<u32>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct User {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub uid: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub gid: Option<u32>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct File {
pub path: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub mode: Option<u32>,
#[serde(flatten)]
pub kind: FileKind,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub enum FileKind {
Content(String),
Symlink(String),
Dir(bool),
}

136
src/dklog.rs Normal file
View File

@ -0,0 +1,136 @@
use std::io::Write;
use std::sync::{LazyLock, Mutex};
use std::time::SystemTime;
use tokio::sync::watch;
use tokio::task::JoinSet;
pub static LOG: LazyLock<Log> = LazyLock::new(Log::new);
pub fn init() {
log::set_logger(&*LOG).expect("set_logger should not fail");
log::set_max_level(log::LevelFilter::Info);
}
pub struct Log {
start: SystemTime,
log: Mutex<Vec<u8>>,
tx: Mutex<Option<watch::Sender<usize>>>,
rx: watch::Receiver<usize>,
tasks: Mutex<Option<JoinSet<()>>>,
}
impl Log {
pub fn new() -> Self {
let (tx, rx) = watch::channel(0);
Self {
start: SystemTime::now(),
log: Mutex::new(Vec::new()),
tx: Mutex::new(Some(tx)),
rx,
tasks: Mutex::new(Some(JoinSet::new())),
}
}
pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) {
if let Some(tasks) = self.tasks.lock().unwrap().as_mut() {
tasks.spawn(task);
}
}
pub fn subscribe(&self) -> LogWatch {
LogWatch {
log: self,
pos: 0,
rx: self.rx.clone(),
}
}
pub async fn copy_to<W: tokio::io::AsyncWrite + Unpin>(&self, mut out: W) {
let mut log = self.subscribe();
use tokio::io::AsyncWriteExt;
while let Some(chunk) = log.next().await {
let _ = out.write_all(&chunk).await;
let _ = out.flush().await;
}
}
pub async fn close(&self) {
self.tx.lock().unwrap().take();
if let Some(tasks) = self.tasks.lock().unwrap().take() {
tasks.join_all().await;
}
}
}
impl log::Log for Log {
fn enabled(&self, metadata: &log::Metadata) -> bool {
use log::Level::*;
match metadata.level() {
Trace | Debug => false,
Info | Warn | Error => true,
}
}
fn log(&self, record: &log::Record) {
if !self.enabled(record.metadata()) {
return;
}
let ts = self.start.elapsed().unwrap_or_default();
use log::Level::*;
let level = match record.level() {
Trace => "\x1b[2mTRC\x1b[0m",
Debug => "\x1b[34mDBG\x1b[0m",
Info => "\x1b[32mINF\x1b[0m",
Warn => "\x1b[93mWRN\x1b[0m",
Error => "\x1b[91mERR\x1b[0m",
};
let Ok(mut log) = self.log.lock() else {
return;
};
writeln!(
log,
"{:3}.{:03} {level} {}",
ts.as_secs(),
ts.subsec_millis(),
record.args()
)
.expect("write to buf should not fail");
if let Some(tx) = self.tx.lock().unwrap().as_mut() {
tx.send_replace(log.len());
}
}
fn flush(&self) {
// always flushed
}
}
pub struct LogWatch<'t> {
log: &'t Log,
pos: usize,
rx: watch::Receiver<usize>,
}
impl<'t> LogWatch<'t> {
pub async fn next(&mut self) -> Option<Vec<u8>> {
loop {
let new_pos = self.rx.borrow_and_update().clone();
if new_pos <= self.pos {
if self.rx.changed().await.is_err() {
return None; // finished
}
continue;
}
let mut chunk = Vec::new();
chunk.extend(&self.log.log.lock().unwrap()[self.pos..new_pos]);
self.pos = new_pos;
return Some(chunk);
}
}
}

43
src/fs.rs Normal file
View File

@ -0,0 +1,43 @@
use log::warn;
use std::io::Result;
use tokio::fs;
pub async fn walk_dir(dir: &str) -> Vec<String> {
let mut todo = Vec::new();
if let Ok(rd) = read_dir(dir).await {
todo.push(rd);
}
let mut results = Vec::new();
while let Some(rd) = todo.pop() {
for (path, is_dir) in rd {
if is_dir {
let Ok(child_rd) = (read_dir(&path).await)
.inspect_err(|e| warn!("reading dir {path} failed: {e}"))
else {
continue;
};
todo.push(child_rd);
} else {
results.push(path);
}
}
}
results
}
async fn read_dir(dir: &str) -> Result<Vec<(String, bool)>> {
let mut rd = fs::read_dir(dir).await?;
let mut entries = Vec::new();
while let Some(entry) = rd.next_entry().await? {
if let Some(path) = entry.path().to_str() {
entries.push((path.to_string(), entry.file_type().await?.is_dir()));
}
}
entries.sort(); // we want a deterministic & intuitive order
Ok(entries)
}

277
src/input.rs Normal file
View File

@ -0,0 +1,277 @@
use log::warn;
use std::fmt::Display;
use std::sync::{Arc, LazyLock};
use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net;
use tokio::sync::{oneshot, watch, Mutex};
pub async fn read_line(prompt: impl Display) -> String {
read(prompt, false).await
}
pub async fn read_password(prompt: impl Display) -> String {
read(prompt, true).await
}
fn choice_char(s: &str) -> char {
s.chars().skip_while(|c| *c != '[').skip(1).next().unwrap()
}
#[test]
fn test_choice_char() {
assert_eq!('r', choice_char("[r]etry"));
assert_eq!('b', choice_char("re[b]boot"));
}
/// ```no_run
/// use init::input;
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() {
/// tokio::spawn(input::answer_requests_from_stdin());
/// match input::read_choice(["[r]etry","[i]gnore","re[b]oot"]).await {
/// 'r' => todo!(),
/// 'i' => todo!(),
/// 'b' => todo!(),
/// _ => unreachable!(),
/// }
/// }
/// ```
pub async fn read_choice<const N: usize>(choices: [&str; N]) -> char {
let chars = choices.map(choice_char);
let mut prompt = String::new();
for s in choices {
if !prompt.is_empty() {
prompt.push_str(", ");
}
prompt.push_str(s);
}
prompt.push_str("? ");
loop {
let line = read_line(&prompt).await;
let Some(ch) = line.chars().nth(0) else {
continue;
};
for choice in chars {
if ch == choice {
return choice;
}
}
}
}
#[derive(Clone, serde::Deserialize, serde::Serialize)]
pub struct InputRequest {
prompt: String,
hide: bool,
}
pub type Reply = Arc<Mutex<Option<oneshot::Sender<String>>>>;
static REQ: LazyLock<Mutex<watch::Sender<Option<(InputRequest, Reply)>>>> = LazyLock::new(|| {
let (tx, _) = watch::channel(None);
Mutex::new(tx)
});
static READ_MUTEX: Mutex<()> = Mutex::const_new(());
async fn read(prompt: impl Display, hide_input: bool) -> String {
let _read_lock = READ_MUTEX.lock();
let req = InputRequest {
prompt: prompt.to_string(),
hide: hide_input,
};
let (tx, rx) = oneshot::channel();
let reply = Arc::new(Mutex::new(Some(tx)));
REQ.lock().await.send_replace(Some((req, reply)));
let input = rx.await.expect("reply sender should not be closed");
REQ.lock().await.send_replace(None);
input
}
pub async fn answer_requests_from_stdin() {
let mut stdin = BufReader::new(io::stdin()).lines();
let mut stdout = io::stdout();
let mut current_req = REQ.lock().await.subscribe();
current_req.mark_changed();
loop {
// TODO check is stdin has been closed (using C-c is enough for now)
(current_req.changed().await).expect("input request should not close");
let Some((req, reply)) = current_req.borrow_and_update().clone() else {
continue;
};
// handle hide
let mut saved_termios = None;
if req.hide {
match termios::Termios::from_fd(0) {
Ok(mut tio) => {
saved_termios = Some(tio.clone());
tio.c_lflag &= !termios::ECHO;
if let Err(e) = termios::tcsetattr(0, termios::TCSAFLUSH, &tio) {
warn!("password may be echoed! {e}");
}
}
Err(e) => {
warn!("password may be echoed! {e}");
}
}
}
// print the prompt and wait for user input
stdout.write_all(req.prompt.as_bytes()).await.unwrap();
stdout.flush().await.unwrap();
tokio::select!(
r = stdin.next_line() => {
let Ok(Some(line)) = r else {
warn!("stdin closed");
return;
};
if let Some(tx) = reply.lock().await.take() {
let _ = tx.send(line);
}
if saved_termios.is_some() {
// final '\n' is hidden too so fix it
stdout.write_all(b"\n").await.unwrap();
stdout.flush().await.unwrap();
}
}
_ = current_req.changed() => {
// reply came from somewhere else
stdout.write_all(b"<answered>\n").await.unwrap();
stdout.flush().await.unwrap();
current_req.mark_changed();
}
);
// restore term if input was hidden
if let Some(tio) = saved_termios {
if let Err(e) = termios::tcsetattr(0, termios::TCSAFLUSH, &tio) {
warn!("failed to restore pty attrs: {e}");
}
}
}
}
const SOCKET_PATH: &str = "/run/init.sock";
pub async fn answer_requests_from_socket() {
let Ok(listener) = net::UnixListener::bind(SOCKET_PATH)
.inspect_err(|e| warn!("failed start input socket listener: {e}"))
else {
return;
};
loop {
let Ok((conn, _)) = (listener.accept())
.await
.inspect_err(|e| warn!("input socket listener failed: {e}"))
else {
return;
};
tokio::spawn(handle_connection(conn));
}
}
async fn handle_connection(conn: net::UnixStream) {
let mut current_req = REQ.lock().await.subscribe();
current_req.mark_changed();
let (rd, mut wr) = io::split(conn);
let mut rd = BufReader::new(rd).lines();
loop {
(current_req.changed().await).expect("input request should not close");
let Some((req, reply)) = current_req.borrow_and_update().clone() else {
if wr.write_all(b"null\n").await.is_err() {
return;
}
if wr.flush().await.is_err() {
return;
}
continue;
};
{
let mut buf = serde_json::to_vec(&req).unwrap();
buf.push(b'\n');
if (wr.write_all(&buf).await).is_err() {
return;
}
if wr.flush().await.is_err() {
return;
}
}
tokio::select!(
r = rd.next_line() => {
let Ok(Some(line)) = r else {
return; // closed
};
if let Some(tx) = reply.lock().await.take() {
let _ = tx.send(line);
}
}
_ = current_req.changed() => {
// reply came from somewhere else
current_req.mark_changed();
}
);
}
}
pub async fn forward_requests_from_socket() -> eyre::Result<()> {
let stream = net::UnixStream::connect(SOCKET_PATH).await?;
let (rd, mut wr) = io::split(stream);
let mut rd = BufReader::new(rd).lines();
let mut line = rd.next_line().await?;
loop {
let Some(req) = line else {
return Ok(());
};
let req: Option<InputRequest> = serde_json::from_str(&req)?;
let Some(req) = req else {
// request answered from somewhere else
REQ.lock().await.send_replace(None);
line = rd.next_line().await?;
continue;
};
tokio::select!(
mut r = read(req.prompt, req.hide) => {
r.push('\n');
wr.write_all(r.as_bytes()).await?;
wr.flush().await?;
line = rd.next_line().await?;
}
l = rd.next_line() => {
line = l?;
}
);
}
}

View File

@ -4,3 +4,8 @@ pub mod lsblk;
pub mod lvm;
pub mod udev;
pub mod utils;
pub mod input;
pub mod blockdev;
pub mod fs;
pub mod dkl;
pub mod dklog;

View File

@ -1,5 +1,5 @@
use std::io;
use std::process::Command;
use eyre::{format_err, Result};
use tokio::process::Command;
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Report {
@ -7,11 +7,13 @@ struct Report {
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(untagged)]
enum ReportObj {
PV { pv: Vec<PV> },
VG { vg: Vec<VG> },
LV { lv: Vec<LV> },
#[serde(rename = "pv")]
PV(Vec<PV>),
#[serde(rename = "vg")]
VG(Vec<VG>),
#[serde(rename = "lv")]
LV(Vec<LV>),
}
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
@ -37,47 +39,63 @@ pub struct VG {
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct LV {
lv_name: String,
vg_name: String,
lv_attr: String,
lv_size: String,
pool_lv: String,
origin: String,
data_percent: String,
metadata_percent: String,
move_pv: String,
mirror_log: String,
copy_percent: String,
convert_lv: String,
pub lv_name: String,
pub vg_name: String,
pub lv_attr: String,
pub lv_size: String,
pub pool_lv: String,
pub origin: String,
pub data_percent: String,
pub metadata_percent: String,
pub move_pv: String,
pub mirror_log: String,
pub copy_percent: String,
pub convert_lv: String,
}
pub fn pvs() -> io::Result<Vec<PV>> {
impl LV {
pub fn equal_name(&self, vg_name: &str, lv_name: &str) -> bool {
vg_name == &self.vg_name && lv_name == &self.lv_name
}
}
pub async fn pvs() -> Result<Vec<PV>> {
report_cmd("pvs", |o| match o {
ReportObj::PV { pv } => Some(pv),
ReportObj::PV(pv) => Some(pv),
_ => None,
})
.await
}
pub fn vgs() -> io::Result<Vec<VG>> {
pub async fn vgs() -> Result<Vec<VG>> {
report_cmd("vgs", |o| match o {
ReportObj::VG { vg } => Some(vg),
ReportObj::VG(vg) => Some(vg),
_ => None,
})
.await
}
pub fn lvs() -> io::Result<Vec<LV>> {
pub async fn lvs() -> Result<Vec<LV>> {
report_cmd("lvs", |o| match o {
ReportObj::LV { lv } => Some(lv),
ReportObj::LV(lv) => Some(lv),
_ => None,
})
.await
}
fn report_cmd<T>(cmd: &str, find: fn(ReportObj) -> Option<Vec<T>>) -> io::Result<Vec<T>> {
let output = Command::new(cmd).arg("--reportformat=json").output()?;
let report: Report = serde_json::from_slice(output.stdout.as_slice()).unwrap();
Ok(report
.report
.into_iter()
async fn report_cmd<T>(cmd: &str, find: fn(ReportObj) -> Option<Vec<T>>) -> Result<Vec<T>> {
let output = Command::new(cmd)
.arg("--reportformat=json")
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr.trim_ascii());
return Err(format_err!("{cmd} failed: {}", stderr));
}
let report: Report = serde_json::from_slice(&output.stdout).unwrap();
Ok((report.report.into_iter())
.filter_map(find)
.flatten()
.collect())

View File

@ -1,33 +1,42 @@
use eyre::Result;
use log::error;
use log::warn;
use std::env;
use std::process::exit;
use init::cmd;
use init::dklog;
fn main() -> Result<()> {
env_logger::builder()
.format_timestamp_millis()
.format_target(false)
.filter_level(log::LevelFilter::Info)
.write_style(env_logger::WriteStyle::Always)
.init();
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
dklog::LOG.spawn(dklog::LOG.copy_to(tokio::io::stderr()));
unsafe {
env::set_var("PATH", "/bin:/sbin:/usr/bin:/usr/sbin");
}
dklog::init();
let call_name = env::args().next().unwrap_or("init".into());
let call_name = (call_name.rsplit_once('/').map(|(_, n)| n)).unwrap_or(call_name.as_str());
if call_name == "init" {
dklog::LOG.spawn(async {
let Ok(log_file) = (tokio::fs::File::create("/var/log/init.log").await)
.inspect_err(|e| warn!("failed to open init.log: {e}"))
else {
return;
};
dklog::LOG.copy_to(log_file).await;
});
}
match call_name {
"init" => cmd::init::run()?,
"init" => cmd::init::run().await,
"init-version" => cmd::version::run(),
"init-connect" => cmd::init_input::run().await,
_ => {
error!("invalid call name: {call_name:?}");
eprintln!("invalid call name: {call_name:?}");
exit(1);
}
};
dklog::LOG.close().await;
Ok(())
}

View File

@ -34,7 +34,7 @@ impl NameAliases {
}
pub fn select_n_by_regex<'t>(
n: usize,
n: i16,
regexs: &Vec<String>,
nas: impl Iterator<Item = &'t NameAliases>,
) -> Vec<String> {
@ -53,9 +53,9 @@ pub fn select_n_by_regex<'t>(
.filter(|na| na.iter().any(matching))
.map(|na| na.name().to_string());
if n <= 0 {
if n == -1 {
nas.collect()
} else {
nas.take(n).collect()
nas.take(n as usize).collect()
}
}

9
test-initrd/id_ecdsa Normal file
View File

@ -0,0 +1,9 @@
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAaAAAABNlY2RzYS
1zaGEyLW5pc3RwMjU2AAAACG5pc3RwMjU2AAAAQQSlevO48OA0/yB8zTP1naijVU3wsB0c
4s6Jc/8y7YwNq51/SlieeGeRFcCSke5Z67rNX2JFiv+tsGQB6TczZm/6AAAAqDU5b2c1OW
9nAAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBKV687jw4DT/IHzN
M/WdqKNVTfCwHRzizolz/zLtjA2rnX9KWJ54Z5EVwJKR7lnrus1fYkWK/62wZAHpNzNmb/
oAAAAhANenGXlP1ZPgcyb/+O57MbpGDOZS4e7UNiyvkD8I3hNnAAAACW53cmtAbndyawEC
AwQFBg==
-----END OPENSSH PRIVATE KEY-----

7
test-initrd/id_ed25519 Normal file
View File

@ -0,0 +1,7 @@
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
QyNTUxOQAAACCCMX0zm3Fi7vXWevHuJvzU30l5F06NAC2qloUewVSoDQAAAJDnq/y456v8
uAAAAAtzc2gtZWQyNTUxOQAAACCCMX0zm3Fi7vXWevHuJvzU30l5F06NAC2qloUewVSoDQ
AAAEDXYq507MsjSN34Qw87guf3d5D4Dt2IrF788CeBcYSNe4IxfTObcWLu9dZ68e4m/NTf
SXkXTo0ALaqWhR7BVKgNAAAACW53cmtAbndyawECAwQ=
-----END OPENSSH PRIVATE KEY-----

View File

@ -1 +0,0 @@
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDLo+mv+0GJViwFiK14bAw8j4HYFIGf4I/fpmIqzHohf70TT+cYqAKSHBYV+uYjNLYQib6Ii/thPexeypw73pq7Z7CHvubnFQUiuqJ7lzXeV4146f5eHL76/TvTmKthS4FVFta6SlCOE6unR6q8HM52VSijavtoX3musxugCwhaHmBNOdUJcNIRCIey8QaOztPK05dDrOZhMQJgSST/BRaNrtY0/xRmJo+5TbQeyjyDuQdK2EoS51QMWzsT/LH6drQBgKd8RRHqjEhscfaV2CmdfuVO/liEdW82epMgFCGYtMetP/rs3bPkC90ULxPZSytaz7d5ux1dvSgrPHzX4306k/GP3d6EvOedy4IKAB53J7lebvrRI5pTVPZvd/RsSGGxUIwjf2Y8TF5nbC2d2D5Oauqfevn29veIRh8mq+AsJMnkvUFwVRN+6WDsZ+F82+AGaCKJFNQLYbRKtXW0zzi+wTsnLJNwlpevRf61SCxehSqeVfnc4TDsAGyRa9nSbR8= test@novit.io