Files
dkl/src/rc/runner.rs
T

264 lines
6.6 KiB
Rust
Raw Normal View History

2026-04-16 11:53:37 +02:00
use log::{error, warn};
use nix::{
sys::signal::{kill, Signal},
unistd::Pid,
};
use std::num::NonZero;
use tokio::{
process, select,
sync::{mpsc, watch},
time::{sleep, sleep_until, Duration, Instant},
};
use super::{Error, Result, Service};
use crate::logger::Logger;
const LOG_PATH: &str = "/var/log";
const TERM_DELAY: Duration = Duration::from_secs(30);
const KILL_DELAY: Duration = Duration::from_secs(10);
const RESTART_DELAY: Duration = Duration::from_secs(8);
#[derive(Debug, Clone, Copy)]
pub enum Cmd {
Start,
Stop,
}
#[derive(Default, Clone, Copy, Debug)]
pub enum State {
#[default]
NeverStarted,
Starting,
Running,
Crashed,
Stopping,
Stopped,
Finalized,
}
pub fn new(
cg: impl Into<String>,
svc: impl Into<String>,
service: Service,
) -> (Runner, watch::Receiver<Child>, mpsc::Sender<Cmd>) {
let (manager, child_rx) = ProcessManager::new(service);
let (cmds_tx, cmds_rx) = mpsc::channel(1);
let r = Runner {
cg: cg.into(),
svc: svc.into(),
cmds_rx,
manager,
};
(r, child_rx, cmds_tx)
}
pub struct Runner {
cg: String,
svc: String,
cmds_rx: mpsc::Receiver<Cmd>,
manager: ProcessManager,
}
impl Runner {
pub async fn run(mut self) {
self.manager.update(State::NeverStarted);
loop {
let cmd = select! {
cmd = self.manager.manage() => {
cmd
}
cmd = self.cmds_rx.recv() => {
let Some(cmd) = cmd else {
break; // command side dropped
};
Some(cmd)
}
};
if let Some(cmd) = cmd {
self.process_cmd(cmd).await;
}
}
self.process_cmd(Cmd::Stop).await;
self.manager.update(State::Finalized);
}
async fn process_cmd(&mut self, cmd: Cmd) {
let cg = &self.cg;
let svc = &self.svc;
match cmd {
Cmd::Start => {
self.manager.start(cg, svc).await;
}
Cmd::Stop => {
self.manager.stop().await;
}
}
}
}
struct ProcessManager {
service: Service,
child_tx: watch::Sender<Child>,
process: Option<process::Child>,
restart_deadline: Option<Instant>,
}
impl ProcessManager {
fn new(service: Service) -> (Self, watch::Receiver<Child>) {
let (child_tx, child_rx) = watch::channel(Child::default());
let pm = Self {
service,
child_tx,
process: None,
restart_deadline: None,
};
(pm, child_rx)
}
/// runs a management iteration (ie: waiting for the child or a restart deadline).
async fn manage(&mut self) -> Option<Cmd> {
if let Some(process) = self.process.as_mut() {
let msg = match process.wait().await {
Ok(status) => status.to_string(),
Err(e) => e.to_string(),
};
self.crashed(msg);
self.process = None;
self.restart_deadline = Some(Instant::now() + RESTART_DELAY);
None
} else if let Some(deadline) = self.restart_deadline {
sleep_until(deadline).await;
Some(Cmd::Start)
} else {
std::future::pending().await
}
}
async fn start(&mut self, cg: &str, svc: &str) {
if self.process.is_some() {
return;
}
self.update(State::Starting);
let logger = Logger {
log_path: LOG_PATH.into(),
log_name: svc.into(),
with_prefix: false,
cgroup: Some(cg.into()),
};
let mut args = self.service.iter();
let Some(cmd) = args.next() else {
error!("{cg}/{svc}: empty command");
return;
};
let Ok(cmd) = (logger.setup(cmd, args).await)
.inspect_err(|e| self.crashed(format!("setup failed: {e}")))
else {
return;
};
let Ok(child) = logger
.spawn(cmd)
.inspect_err(|e| self.crashed(format!("exec failed: {e}")))
else {
return;
};
self.process = Some(child);
self.restart_deadline = None;
self.update(State::Running);
}
async fn stop(&mut self) {
self.restart_deadline = None;
let Some(mut process) = self.process.take() else {
return;
};
let Some(pid) = process.id() else {
let _ = process.wait().await; // already dead, reap it
self.update(State::Stopped);
return;
};
let pid = pid as i32;
self.update_full(pid, State::Stopping, None);
let pid = Pid::from_raw(pid);
let _ = kill(pid, Signal::SIGTERM).inspect_err(|e| error!("kill -TERM {pid} failed: {e}"));
select! {
_ = process.wait() => {
self.update(State::Stopped);
return
},
_ = sleep(TERM_DELAY) => {
warn!("process {pid} did not exit during the grace period, killing");
let _ = process.kill().await.inspect_err(|e| error!("kill -KILL {pid} failed: {e}"));
}
}
select! {
_ = process.wait() => {
self.update(State::Stopped);
return
},
_ = sleep(KILL_DELAY) => {
error!("process {pid} still alive after SIGKILL");
}
}
}
fn process_pid(&self) -> i32 {
(self.process.as_ref())
.and_then(|c| Some(c.id()? as i32))
.unwrap_or(0)
}
fn update(&self, state: State) {
let pid = self.process_pid();
self.update_full(pid, state, None);
}
fn update_full(&self, pid: i32, state: State, msg: Option<String>) {
self.child_tx.send_replace(Child {
pid: NonZero::new(pid),
state,
msg,
});
}
fn crashed(&self, msg: String) {
let pid = self.process_pid();
self.update_full(pid, State::Crashed, Some(msg));
}
}
#[derive(Clone, Default)]
pub struct Child {
pub pid: Option<NonZero<i32>>,
pub state: State,
pub msg: Option<String>,
}
impl Child {
pub fn reload(&self) -> Result<()> {
self.kill(Signal::SIGHUP)
}
pub fn kill(&self, sig: Signal) -> Result<()> {
let Some(pid) = self.pid else {
return Err(Error::ProcessExited);
};
kill(Pid::from_raw(pid.into()), sig).map_err(|e| Error::KillFailed(e))
}
}