Files
dkl/src/dynlay.rs
2025-07-21 13:18:08 +02:00

189 lines
5.9 KiB
Rust

use eyre::{format_err, Result};
use log::{debug, error, info, warn};
use std::path::PathBuf;
use tokio::{fs, io::AsyncWriteExt, process::Command};
use crate::fs::spawn_walk_dir;
pub struct Dynlay<'t> {
pub url_prefix: &'t str,
pub layers_dir: &'t str,
pub chroot: PathBuf,
}
impl<'t> Dynlay<'t> {
pub async fn install(&self, layer: &str, version: &str) -> Result<()> {
let lay_dir = &format!("{base}/{layer}", base = self.layers_dir);
debug!("mkdir -p {lay_dir}");
fs::create_dir_all(lay_dir).await?;
let lay_path = &format!("{lay_dir}/{version}");
if !fs::try_exists(lay_path).await? {
let part_file = &format!("{lay_dir}/{version}.tmp");
self.fetch(layer, version, part_file).await?;
(fs::rename(part_file, lay_path).await)
.map_err(|e| format_err!("failed mv {part_file} {lay_path}: {e}"))?;
}
let mount_path = PathBuf::from(lay_dir).join("mounts").join(layer);
(fs::create_dir_all(&mount_path).await)
.map_err(|e| format_err!("mkdir -p {mount_path:?} failed: {e}"))?;
let mount_path = &fs::canonicalize(mount_path).await?;
let mount_path_str = &mount_path.to_string_lossy().into_owned();
if is_mount_point(mount_path_str).await? {
info!("clearing previous mount");
let mut paths = spawn_walk_dir(mount_path.clone());
while let Some(result) = paths.recv().await {
let Ok((path, md)) = result else {
continue;
};
if !md.is_dir() {
let path = self.chroot.join(&path);
debug!("rm {path:?}");
if let Err(e) = fs::remove_file(&path).await {
warn!("rm {path:?} failed: {e}");
}
}
}
sudo("umount", &[mount_path]).await?;
}
// mount layer
info!("mounting layer");
sudo("mount", &["-t", "squashfs", lay_path, &mount_path_str]).await?;
let mut paths = spawn_walk_dir(mount_path.clone());
while let Some(result) = paths.recv().await {
let Ok((path, md)) = result else {
continue;
};
let target = self.chroot.join(&path);
if md.is_dir() {
debug!("mkdir -p {target:?}");
if let Err(e) = fs::create_dir_all(&target).await {
error!("mkdir -p {target:?} failed: {e}");
}
} else {
let _ = fs::remove_file(&target).await;
let source = mount_path.join(&path);
debug!("ln -s {source:?} {target:?}");
if let Err(e) = fs::symlink(&source, &target).await {
error!("ln -s {source:?} {target:?} failed: {e}");
}
}
}
Ok(())
}
async fn fetch(&self, layer: &str, version: &str, part_file: &str) -> Result<()> {
let url = &format!("{}/{layer}/{version}", self.url_prefix);
info!("fetching {url}");
let mut out = (fs::File::create(part_file).await)
.map_err(|e| format_err!("failed to open {part_file}: {e}"))?;
let resp = reqwest::get(url).await?;
if !resp.status().is_success() {
return Err(format_err!("fetch failed: {}", resp.status()));
}
let sha1 = (resp.headers().get("x-content-sha1"))
.ok_or(format_err!("no content hash in response"))?;
let sha1 = (sha1.to_str()).map_err(|e| format_err!("invalid sha1: {e}"))?;
debug!("content sha1: {sha1}");
let mut exp_sha1 = [0; 20];
hex::decode_to_slice(sha1, &mut exp_sha1).map_err(|e| format_err!("invalid sha1: {e}"))?;
let mut hash = openssl::sha::Sha1::new();
use futures::StreamExt;
let mut stream = resp.bytes_stream();
while let Some(bytes) = stream.next().await {
let bytes = bytes.map_err(|e| format_err!("remote read error: {e}"))?;
hash.update(&bytes);
(out.write_all(&bytes).await).map_err(|e| format_err!("local write error: {e}"))?;
}
(out.flush().await).map_err(|e| format_err!("local write error: {e}"))?;
drop(out);
let dl_sha1 = hash.finish();
if dl_sha1 != exp_sha1 {
if let Err(e) = fs::remove_file(part_file).await {
error!("failed to remove {part_file}: {e}");
}
return Err(format_err!(
"invalid content hash: expected {exp}, got {got}",
exp = hex::encode(exp_sha1),
got = hex::encode(dl_sha1)
));
}
Ok(())
}
}
async fn sudo<I, S>(program: &str, args: I) -> Result<()>
where
I: IntoIterator<Item = S>,
S: AsRef<std::ffi::OsStr>,
{
let mut cmd = if nix::unistd::geteuid().is_root() {
let mut cmd = Command::new(program);
cmd.args(args);
cmd
} else {
let mut cmd = Command::new("sudo");
cmd.arg(program).args(args);
cmd
};
let status = cmd.status().await?;
if status.success() {
Ok(())
} else {
Err(format_err!("{program} failed: {status}"))
}
}
async fn is_mount_point(target: &str) -> Result<bool> {
for line in fs::read_to_string("/proc/self/mounts").await?.lines() {
let line = line.trim_ascii();
if line.is_empty() || line.starts_with("#") {
continue;
}
let split: Vec<_> = line.split_ascii_whitespace().collect();
if split.len() < 6 {
continue;
}
// let dev = split[0];
let mount_point = split[1];
// let fstype = split[2];
// let mntops = split[3];
// let fs_freq = split[4];
// let fsck_passno = split[5];
if mount_point == target {
return Ok(true);
}
}
Ok(false)
}