Compare commits

..

7 Commits

Author SHA1 Message Date
1e047afac3 cargo update 2025-09-04 05:41:20 +02:00
6f059287ec minor fix in write_raw 2025-08-05 09:37:26 +02:00
bbea9b9c00 dls dlset support 2025-07-22 18:37:49 +02:00
be5db231d9 remove rsmount dependency 2025-07-21 13:18:08 +02:00
b01c41b856 chore 2025-07-21 12:26:44 +02:00
4ccda5039b logger: oh, with_extension is replace, not append 2025-07-21 10:11:05 +02:00
852738bec3 logger: use official zst extension (not zstd) 2025-07-21 09:49:02 +02:00
9 changed files with 507 additions and 562 deletions

8
.gitignore vendored
View File

@ -1,8 +1,12 @@
/target
/dls
/modd.conf
/m1_bootstrap-config
/config.yaml
/dist
/dkl
/tmp
/config.yaml
# dls assets
/cluster_*_*
/host_*_*

772
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -12,8 +12,9 @@ codegen-units = 1
[dependencies]
async-compression = { version = "0.4.27", features = ["tokio", "zstd"] }
base32 = "0.5.1"
bytes = "1.10.1"
chrono = { version = "0.4.41", default-features = false, features = ["now"] }
chrono = { version = "0.4.41", default-features = false, features = ["clock", "now"] }
clap = { version = "4.5.40", features = ["derive", "env"] }
clap_complete = { version = "4.5.54", features = ["unstable-dynamic"] }
env_logger = "0.11.8"
@ -23,11 +24,11 @@ futures-util = "0.3.31"
glob = "0.3.2"
hex = "0.4.3"
log = "0.4.27"
lz4 = "1.28.1"
nix = { version = "0.30.1", features = ["user"] }
openssl = "0.10.73"
page_size = "0.6.0"
reqwest = { version = "0.12.20", features = ["json", "stream"] }
rsmount = "0.2.1"
reqwest = { version = "0.12.20", features = ["json", "stream", "native-tls"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
serde_yaml = "0.9.34"

View File

@ -1,7 +1,11 @@
use bytes::Bytes;
use clap::{CommandFactory, Parser, Subcommand};
use eyre::{Result, format_err};
use eyre::format_err;
use futures_util::Stream;
use futures_util::StreamExt;
use tokio::io::AsyncWriteExt;
use std::time::{Duration, SystemTime};
use tokio::fs;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use dkl::dls;
@ -25,9 +29,36 @@ enum Command {
},
Hosts,
Host {
#[arg(short = 'o', long)]
out: Option<String>,
host: String,
asset: Option<String>,
},
#[command(subcommand)]
DlSet(DlSet),
}
#[derive(Subcommand)]
enum DlSet {
Sign {
#[arg(short = 'e', long, default_value = "1d")]
expiry: String,
#[arg(value_parser = parse_download_set_item)]
items: Vec<dls::DownloadSetItem>,
},
Show {
#[arg(env = "DLS_DLSET")]
signed_set: String,
},
Fetch {
#[arg(long, env = "DLS_DLSET")]
signed_set: String,
#[arg(short = 'o', long)]
out: Option<String>,
kind: String,
name: String,
asset: String,
},
}
#[derive(Subcommand)]
@ -62,7 +93,7 @@ enum ClusterCommand {
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
async fn main() -> eyre::Result<()> {
clap_complete::CompleteEnv::with_factory(Cli::command).complete();
let cli = Cli::parse();
@ -125,44 +156,110 @@ async fn main() -> Result<()> {
}
}
C::Hosts => write_json(&dls.hosts().await?),
C::Host { host, asset } => {
C::Host { out, host, asset } => {
let host_name = host.clone();
let host = dls.host(host);
match asset {
None => write_json(&host.config().await?),
Some(asset) => {
let mut stream = host.asset(&asset).await?;
let out_path = format!("{host_name}_{asset}");
eprintln!("writing {host_name} asset {asset} to {out_path}");
let out = tokio::fs::File::options()
.mode(0o600)
.write(true)
.create(true)
.truncate(true)
.open(out_path)
.await?;
let mut out = tokio::io::BufWriter::new(out);
let mut n = 0u64;
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
n += chunk.len() as u64;
eprint!("wrote {n} bytes\r");
out.write_all(&chunk).await?;
}
eprintln!();
out.flush().await?;
let stream = host.asset(&asset).await?;
let mut out = create_asset_file(out, "host", &host_name, &asset).await?;
copy_stream(stream, &mut out).await?;
}
}
}
C::DlSet(set) => match set {
DlSet::Sign { expiry, items } => {
let req = dls::DownloadSetReq { expiry, items };
let signed = dls.sign_dl_set(&req).await?;
println!("{signed}");
}
DlSet::Show { signed_set } => {
let raw = base32::decode(base32::Alphabet::Rfc4648 { padding: false }, &signed_set)
.ok_or(format_err!("invalid dlset"))?;
let sig_len = raw[0] as usize;
let (sig, data) = raw[1..].split_at(sig_len);
println!("signature: {}...", hex::encode(&sig[..16]));
let data = lz4::Decoder::new(data)?;
let data = std::io::read_to_string(data)?;
let (expiry, items) = data.split_once('|').ok_or(format_err!("invalid dlset"))?;
let expiry = i64::from_str_radix(expiry, 16)?;
let expiry = chrono::DateTime::from_timestamp(expiry, 0).unwrap();
println!("expires on {expiry}");
for item in items.split('|') {
let mut parts = item.split(':');
let Some(kind) = parts.next() else {
continue;
};
let Some(name) = parts.next() else {
continue;
};
for asset in parts {
println!("- {kind} {name} {asset}");
}
}
}
DlSet::Fetch {
signed_set,
out,
kind,
name,
asset,
} => {
let stream = dls.fetch_dl_set(&signed_set, &kind, &name, &asset).await?;
let mut out = create_asset_file(out, &kind, &name, &asset).await?;
copy_stream(stream, &mut out).await?;
}
},
};
Ok(())
}
async fn create_asset_file(
path: Option<String>,
kind: &str,
name: &str,
asset: &str,
) -> std::io::Result<fs::File> {
let path = &path.unwrap_or(format!("{kind}_{name}_{asset}"));
eprintln!("writing {kind} {name} asset {asset} to {path}");
(fs::File::options().write(true).create(true).truncate(true))
.mode(0o600)
.open(path)
.await
}
async fn copy_stream(
mut stream: impl Stream<Item = reqwest::Result<Bytes>> + Unpin,
out: &mut (impl AsyncWrite + Unpin),
) -> std::io::Result<()> {
let mut out = tokio::io::BufWriter::new(out);
let info_delay = Duration::from_secs(1);
let mut ts = SystemTime::now();
let mut n = 0u64;
while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(|e| std::io::Error::other(e))?;
n += chunk.len() as u64;
out.write_all(&chunk).await?;
if ts.elapsed().is_ok_and(|t| t >= info_delay) {
eprint!("wrote {n} bytes\r");
ts = SystemTime::now();
}
}
eprintln!("wrote {n} bytes");
out.flush().await
}
fn write_json<T: serde::ser::Serialize>(v: &T) {
let data = serde_json::to_string_pretty(v).expect("value should serialize to json");
println!("{data}");
@ -172,6 +269,20 @@ fn write_raw(raw: &[u8]) {
use std::io::Write;
let mut out = std::io::stdout();
out.write(raw).expect("stdout write");
out.write_all(raw).expect("stdout write");
out.flush().expect("stdout flush");
}
fn parse_download_set_item(s: &str) -> Result<dls::DownloadSetItem, std::io::Error> {
let err = |s: &str| std::io::Error::other(s);
let mut parts = s.split(':');
let item = dls::DownloadSetItem {
kind: parts.next().ok_or(err("no kind"))?.to_string(),
name: parts.next().ok_or(err("no name"))?.to_string(),
assets: parts.map(|p| p.to_string()).collect(),
};
Ok(item)
}

View File

@ -33,7 +33,7 @@ impl Client {
self.get_json("clusters").await
}
pub fn cluster(&self, name: String) -> Cluster {
pub fn cluster(&self, name: String) -> Cluster<'_> {
Cluster { dls: self, name }
}
@ -41,16 +41,30 @@ impl Client {
self.get_json("hosts").await
}
pub fn host(&self, name: String) -> Host {
pub fn host(&self, name: String) -> Host<'_> {
Host { dls: self, name }
}
pub async fn get_json<T: serde::de::DeserializeOwned>(&self, path: impl Display) -> Result<T> {
let req = self.get(&path)?.header("Accept", "application/json");
pub async fn sign_dl_set(&self, req: &DownloadSetReq) -> Result<String> {
let req = (self.req(Method::POST, "sign-download-set")?).json(req);
self.req_json(req).await
}
pub async fn fetch_dl_set(
&self,
signed_dlset: &str,
kind: &str,
name: &str,
asset: &str,
) -> Result<impl Stream<Item = reqwest::Result<Bytes>>> {
let req = self.get(format!(
"public/download-set/{kind}/{name}/{asset}?set={signed_dlset}"
))?;
let resp = do_req(req, &self.token).await?;
Ok(resp.bytes_stream())
}
let body = resp.bytes().await.map_err(Error::Read)?;
serde_json::from_slice(&body).map_err(Error::Parse)
pub async fn get_json<T: serde::de::DeserializeOwned>(&self, path: impl Display) -> Result<T> {
self.req_json(self.get(&path)?).await
}
pub async fn get_bytes(&self, path: impl Display) -> Result<Vec<u8>> {
let resp = do_req(self.get(&path)?, &self.token).await?;
@ -60,6 +74,16 @@ impl Client {
self.req(Method::GET, path)
}
pub async fn req_json<T: serde::de::DeserializeOwned>(
&self,
req: reqwest::RequestBuilder,
) -> Result<T> {
let req = req.header("Accept", "application/json");
let resp = do_req(req, &self.token).await?;
let body = resp.bytes().await.map_err(Error::Read)?;
serde_json::from_slice(&body).map_err(Error::Parse)
}
pub fn req(&self, method: Method, path: impl Display) -> Result<reqwest::RequestBuilder> {
let uri = format!("{}/{path}", self.base_url);
@ -184,6 +208,23 @@ pub struct KubeSignReq {
pub validity: Option<String>,
}
#[derive(serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct DownloadSetReq {
pub expiry: String,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub items: Vec<DownloadSetItem>,
}
#[derive(Clone, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct DownloadSetItem {
pub kind: String,
pub name: String,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub assets: Vec<String>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct ServerError {
#[serde(default)]

View File

@ -30,17 +30,14 @@ impl<'t> Dynlay<'t> {
}
let mount_path = PathBuf::from(lay_dir).join("mounts").join(layer);
let mount_path_str = mount_path.to_string_lossy().into_owned();
(fs::create_dir_all(&mount_path).await)
.map_err(|e| format_err!("mkdir -p {mount_path:?} failed: {e}"))?;
let mount_path = &fs::canonicalize(mount_path).await?;
let mount_path_str = &mount_path.to_string_lossy().into_owned();
let mut mount_info = rsmount::tables::MountInfo::new()?;
mount_info.import_mountinfo()?;
if mount_info.find_target(mount_path).is_some() {
if is_mount_point(mount_path_str).await? {
info!("clearing previous mount");
let mut paths = spawn_walk_dir(mount_path.clone());
@ -163,3 +160,29 @@ where
Err(format_err!("{program} failed: {status}"))
}
}
async fn is_mount_point(target: &str) -> Result<bool> {
for line in fs::read_to_string("/proc/self/mounts").await?.lines() {
let line = line.trim_ascii();
if line.is_empty() || line.starts_with("#") {
continue;
}
let split: Vec<_> = line.split_ascii_whitespace().collect();
if split.len() < 6 {
continue;
}
// let dev = split[0];
let mount_point = split[1];
// let fstype = split[2];
// let mntops = split[3];
// let fs_freq = split[4];
// let fsck_passno = split[5];
if mount_point == target {
return Ok(true);
}
}
Ok(false)
}

View File

@ -243,7 +243,7 @@ async fn compress(path: impl AsRef<Path>) {
.await
.map_err(|e| format_err!("open {path_str} failed: {e}"))?;
let out_path = path.with_extension("zstd");
let out_path = path.with_extension("zst");
let out = (File::create(&out_path).await) // create output
.map_err(|e| format_err!("create {} failed: {e}", out_path.to_string_lossy()))?;
@ -285,14 +285,16 @@ pub async fn log_files(log_path: &str, log_name: &str) -> std::io::Result<Vec<Lo
continue;
};
let (name, compressed) = file_name
.strip_suffix(".zstd")
.map_or((file_name, false), |s| (s, true));
let Some(name) = name.strip_suffix(".log") else {
let Some((name, ext)) = file_name.rsplit_once('.') else {
continue;
};
let compressed = match ext {
"zst" => true,
"log" => false,
_ => continue,
};
let Some((name, timestamp)) = name.rsplit_once('.') else {
continue;
};

View File

@ -5,7 +5,7 @@ dkl=target/debug/dkl
test=${1:-dynlay}
export RUST_LOG=debug,rsmount=info
export RUST_LOG=debug
case $test in
apply-config)
@ -20,7 +20,7 @@ case $test in
;;
log-ls)
$dkl log --log-path tmp/log bash ls
$dkl log --log-path tmp/log bash ls -l
;;
log-cat)
$dkl log --log-path tmp/log bash cat 20250720_12 20301231_23

View File

@ -18,3 +18,12 @@ $dls cluster cluster ssh-sign ~/.ssh/id_ed25519.pub
$dls host m1 | jq '{Name, ClusterName, IPs}'
$dls host m1 bootstrap-config
export DLS_DLSET=$($dls dl-set sign --expiry 1d \
cluster:cluster:addons \
host:m1:kernel:initrd:bootstrap.tar \
host:m2:config:bootstrap-config:boot.vmdk)
$dls dl-set show
$dls dl-set fetch host m2 bootstrap-config
rm host_m2_bootstrap-config