diff --git a/.gitignore b/.gitignore index d057c42..2003da7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,12 @@ /target /dls /modd.conf -/m1_bootstrap-config -/config.yaml /dist /dkl /tmp + +/config.yaml + +# dls assets +/cluster_*_* +/host_*_* diff --git a/Cargo.lock b/Cargo.lock index 63de5f0..7e1a754 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.19" @@ -117,6 +132,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "base32" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "022dfe9eb35f19ebbcb51e0b40a5ab759f46ad60cadf7297e0bd085afb50e076" + [[package]] name = "base64" version = "0.22.1" @@ -170,7 +191,10 @@ version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" dependencies = [ + "android-tzdata", + "iana-time-zone", "num-traits", + "windows-link", ] [[package]] @@ -263,6 +287,7 @@ name = "dkl" version = "1.0.0" dependencies = [ "async-compression", + "base32", "bytes", "chrono", "clap", @@ -274,6 +299,7 @@ dependencies = [ "glob", "hex", "log", + "lz4", "nix", "openssl", "page_size", @@ -658,6 +684,30 @@ dependencies = [ "windows-registry", ] +[[package]] +name = "iana-time-zone" +version = "0.1.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icu_collections" version = "2.0.0" @@ -897,6 +947,25 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +[[package]] +name = "lz4" +version = "1.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a20b523e860d03443e98350ceaac5e71c6ba89aea7d960769ec3ce37f4de5af4" +dependencies = [ + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "memchr" version = "2.7.5" @@ -1823,6 +1892,41 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "windows-link" version = "0.1.3" diff --git a/Cargo.toml b/Cargo.toml index cad24cc..34359f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,9 @@ codegen-units = 1 [dependencies] async-compression = { version = "0.4.27", features = ["tokio", "zstd"] } +base32 = "0.5.1" bytes = "1.10.1" -chrono = { version = "0.4.41", default-features = false, features = ["now"] } +chrono = { version = "0.4.41", default-features = false, features = ["clock", "now"] } clap = { version = "4.5.40", features = ["derive", "env"] } clap_complete = { version = "4.5.54", features = ["unstable-dynamic"] } env_logger = "0.11.8" @@ -23,6 +24,7 @@ futures-util = "0.3.31" glob = "0.3.2" hex = "0.4.3" log = "0.4.27" +lz4 = "1.28.1" nix = { version = "0.30.1", features = ["user"] } openssl = "0.10.73" page_size = "0.6.0" diff --git a/src/bin/dls.rs b/src/bin/dls.rs index 5aaf339..4dd886f 100644 --- a/src/bin/dls.rs +++ b/src/bin/dls.rs @@ -1,7 +1,11 @@ +use bytes::Bytes; use clap::{CommandFactory, Parser, Subcommand}; -use eyre::{Result, format_err}; +use eyre::format_err; +use futures_util::Stream; use futures_util::StreamExt; -use tokio::io::AsyncWriteExt; +use std::time::{Duration, SystemTime}; +use tokio::fs; +use tokio::io::{AsyncWrite, AsyncWriteExt}; use dkl::dls; @@ -25,9 +29,36 @@ enum Command { }, Hosts, Host { + #[arg(short = 'o', long)] + out: Option, host: String, asset: Option, }, + #[command(subcommand)] + DlSet(DlSet), +} + +#[derive(Subcommand)] +enum DlSet { + Sign { + #[arg(short = 'e', long, default_value = "1d")] + expiry: String, + #[arg(value_parser = parse_download_set_item)] + items: Vec, + }, + Show { + #[arg(env = "DLS_DLSET")] + signed_set: String, + }, + Fetch { + #[arg(long, env = "DLS_DLSET")] + signed_set: String, + #[arg(short = 'o', long)] + out: Option, + kind: String, + name: String, + asset: String, + }, } #[derive(Subcommand)] @@ -62,7 +93,7 @@ enum ClusterCommand { } #[tokio::main(flavor = "current_thread")] -async fn main() -> Result<()> { +async fn main() -> eyre::Result<()> { clap_complete::CompleteEnv::with_factory(Cli::command).complete(); let cli = Cli::parse(); @@ -125,44 +156,110 @@ async fn main() -> Result<()> { } } C::Hosts => write_json(&dls.hosts().await?), - C::Host { host, asset } => { + C::Host { out, host, asset } => { let host_name = host.clone(); let host = dls.host(host); match asset { None => write_json(&host.config().await?), Some(asset) => { - let mut stream = host.asset(&asset).await?; - - let out_path = format!("{host_name}_{asset}"); - eprintln!("writing {host_name} asset {asset} to {out_path}"); - - let out = tokio::fs::File::options() - .mode(0o600) - .write(true) - .create(true) - .truncate(true) - .open(out_path) - .await?; - let mut out = tokio::io::BufWriter::new(out); - - let mut n = 0u64; - while let Some(chunk) = stream.next().await { - let chunk = chunk?; - n += chunk.len() as u64; - eprint!("wrote {n} bytes\r"); - out.write_all(&chunk).await?; - } - eprintln!(); - - out.flush().await?; + let stream = host.asset(&asset).await?; + let mut out = create_asset_file(out, "host", &host_name, &asset).await?; + copy_stream(stream, &mut out).await?; } } } + C::DlSet(set) => match set { + DlSet::Sign { expiry, items } => { + let req = dls::DownloadSetReq { expiry, items }; + let signed = dls.sign_dl_set(&req).await?; + println!("{signed}"); + } + DlSet::Show { signed_set } => { + let raw = base32::decode(base32::Alphabet::Rfc4648 { padding: false }, &signed_set) + .ok_or(format_err!("invalid dlset"))?; + + let sig_len = raw[0] as usize; + let (sig, data) = raw[1..].split_at(sig_len); + println!("signature: {}...", hex::encode(&sig[..16])); + + let data = lz4::Decoder::new(data)?; + let data = std::io::read_to_string(data)?; + + let (expiry, items) = data.split_once('|').ok_or(format_err!("invalid dlset"))?; + let expiry = i64::from_str_radix(expiry, 16)?; + let expiry = chrono::DateTime::from_timestamp(expiry, 0).unwrap(); + + println!("expires on {expiry}"); + + for item in items.split('|') { + let mut parts = item.split(':'); + let Some(kind) = parts.next() else { + continue; + }; + let Some(name) = parts.next() else { + continue; + }; + for asset in parts { + println!("- {kind} {name} {asset}"); + } + } + } + DlSet::Fetch { + signed_set, + out, + kind, + name, + asset, + } => { + let stream = dls.fetch_dl_set(&signed_set, &kind, &name, &asset).await?; + let mut out = create_asset_file(out, &kind, &name, &asset).await?; + copy_stream(stream, &mut out).await?; + } + }, }; Ok(()) } +async fn create_asset_file( + path: Option, + kind: &str, + name: &str, + asset: &str, +) -> std::io::Result { + let path = &path.unwrap_or(format!("{kind}_{name}_{asset}")); + eprintln!("writing {kind} {name} asset {asset} to {path}"); + (fs::File::options().write(true).create(true).truncate(true)) + .mode(0o600) + .open(path) + .await +} + +async fn copy_stream( + mut stream: impl Stream> + Unpin, + out: &mut (impl AsyncWrite + Unpin), +) -> std::io::Result<()> { + let mut out = tokio::io::BufWriter::new(out); + + let info_delay = Duration::from_secs(1); + let mut ts = SystemTime::now(); + + let mut n = 0u64; + while let Some(chunk) = stream.next().await { + let chunk = chunk.map_err(|e| std::io::Error::other(e))?; + n += chunk.len() as u64; + out.write_all(&chunk).await?; + + if ts.elapsed().is_ok_and(|t| t >= info_delay) { + eprint!("wrote {n} bytes\r"); + ts = SystemTime::now(); + } + } + eprintln!("wrote {n} bytes"); + + out.flush().await +} + fn write_json(v: &T) { let data = serde_json::to_string_pretty(v).expect("value should serialize to json"); println!("{data}"); @@ -175,3 +272,17 @@ fn write_raw(raw: &[u8]) { out.write(raw).expect("stdout write"); out.flush().expect("stdout flush"); } + +fn parse_download_set_item(s: &str) -> Result { + let err = |s: &str| std::io::Error::other(s); + + let mut parts = s.split(':'); + + let item = dls::DownloadSetItem { + kind: parts.next().ok_or(err("no kind"))?.to_string(), + name: parts.next().ok_or(err("no name"))?.to_string(), + assets: parts.map(|p| p.to_string()).collect(), + }; + + Ok(item) +} diff --git a/src/dls.rs b/src/dls.rs index 44e41f2..cc8c63b 100644 --- a/src/dls.rs +++ b/src/dls.rs @@ -45,12 +45,26 @@ impl Client { Host { dls: self, name } } - pub async fn get_json(&self, path: impl Display) -> Result { - let req = self.get(&path)?.header("Accept", "application/json"); + pub async fn sign_dl_set(&self, req: &DownloadSetReq) -> Result { + let req = (self.req(Method::POST, "sign-download-set")?).json(req); + self.req_json(req).await + } + pub async fn fetch_dl_set( + &self, + signed_dlset: &str, + kind: &str, + name: &str, + asset: &str, + ) -> Result>> { + let req = self.get(format!( + "public/download-set/{kind}/{name}/{asset}?set={signed_dlset}" + ))?; let resp = do_req(req, &self.token).await?; + Ok(resp.bytes_stream()) + } - let body = resp.bytes().await.map_err(Error::Read)?; - serde_json::from_slice(&body).map_err(Error::Parse) + pub async fn get_json(&self, path: impl Display) -> Result { + self.req_json(self.get(&path)?).await } pub async fn get_bytes(&self, path: impl Display) -> Result> { let resp = do_req(self.get(&path)?, &self.token).await?; @@ -60,6 +74,16 @@ impl Client { self.req(Method::GET, path) } + pub async fn req_json( + &self, + req: reqwest::RequestBuilder, + ) -> Result { + let req = req.header("Accept", "application/json"); + let resp = do_req(req, &self.token).await?; + + let body = resp.bytes().await.map_err(Error::Read)?; + serde_json::from_slice(&body).map_err(Error::Parse) + } pub fn req(&self, method: Method, path: impl Display) -> Result { let uri = format!("{}/{path}", self.base_url); @@ -184,6 +208,23 @@ pub struct KubeSignReq { pub validity: Option, } +#[derive(serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "PascalCase")] +pub struct DownloadSetReq { + pub expiry: String, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub items: Vec, +} + +#[derive(Clone, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "PascalCase")] +pub struct DownloadSetItem { + pub kind: String, + pub name: String, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub assets: Vec, +} + #[derive(Debug, serde::Deserialize, serde::Serialize)] struct ServerError { #[serde(default)] diff --git a/test-dls b/test-dls index 0f656d2..a05a656 100755 --- a/test-dls +++ b/test-dls @@ -18,3 +18,12 @@ $dls cluster cluster ssh-sign ~/.ssh/id_ed25519.pub $dls host m1 | jq '{Name, ClusterName, IPs}' $dls host m1 bootstrap-config +export DLS_DLSET=$($dls dl-set sign --expiry 1d \ + cluster:cluster:addons \ + host:m1:kernel:initrd:bootstrap.tar \ + host:m2:config:bootstrap-config:boot.vmdk) + +$dls dl-set show +$dls dl-set fetch host m2 bootstrap-config +rm host_m2_bootstrap-config +