From 15fe8c9ce6b2174f91b097603a61904d4bbd620d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mika=C3=ABl=20Cluseau?= Date: Thu, 16 Apr 2026 11:53:37 +0200 Subject: [PATCH] feat(dkl rc) --- Cargo.lock | 245 ++++++++++------------- Cargo.toml | 4 +- src/bin/dkl.rs | 118 +++++++++-- src/logger.rs | 138 ++++++++----- src/rc.rs | 494 +++++++++++++++++++++++++++++++++++++++++++++++ src/rc/runner.rs | 263 +++++++++++++++++++++++++ 6 files changed, 1048 insertions(+), 214 deletions(-) create mode 100644 src/rc/runner.rs diff --git a/Cargo.lock b/Cargo.lock index 9f75f11..f8844c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,21 +20,6 @@ dependencies = [ "libc", ] -[[package]] -name = "anstream" -version = "0.6.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" -dependencies = [ - "anstyle", - "anstyle-parse 0.2.7", - "anstyle-query", - "anstyle-wincon", - "colorchoice", - "is_terminal_polyfill", - "utf8parse", -] - [[package]] name = "anstream" version = "1.0.0" @@ -42,7 +27,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" dependencies = [ "anstyle", - "anstyle-parse 1.0.0", + "anstyle-parse", "anstyle-query", "anstyle-wincon", "colorchoice", @@ -56,15 +41,6 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" -[[package]] -name = "anstyle-parse" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" -dependencies = [ - "utf8parse", -] - [[package]] name = "anstyle-parse" version = "1.0.0" @@ -150,9 +126,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bitflags" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" +checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" [[package]] name = "blake2b_simd" @@ -185,9 +161,9 @@ checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" [[package]] name = "cc" -version = "1.2.57" +version = "1.2.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423" +checksum = "43c5703da9466b66a946814e1adf53ea2c90f10063b86290cc9eb67ce3478a20" dependencies = [ "find-msvc-tools", "jobserver", @@ -220,9 +196,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.6.0" +version = "4.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351" +checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51" dependencies = [ "clap_builder", "clap_derive", @@ -234,7 +210,7 @@ version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" dependencies = [ - "anstream 1.0.0", + "anstream", "anstyle", "clap_lex", "strsim", @@ -242,9 +218,9 @@ dependencies = [ [[package]] name = "clap_complete" -version = "4.6.0" +version = "4.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19c9f1dde76b736e3681f28cec9d5a61299cbaae0fce80a68e43724ad56031eb" +checksum = "3ff7a1dccbdd8b078c2bdebff47e404615151534d5043da397ec50286816f9cb" dependencies = [ "clap", "clap_lex", @@ -254,9 +230,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.6.0" +version = "4.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1110bd8a634a1ab8cb04345d8d878267d57c3cf1b38d91b71af6686408bbca6a" +checksum = "f2ce8604710f6733aa641a2b3731eaa1e8b3d9973d5e3565da11800813f997a9" dependencies = [ "heck", "proc-macro2", @@ -371,9 +347,9 @@ dependencies = [ [[package]] name = "env_filter" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a1c3cc8e57274ec99de65301228b537f1e4eedc1b8e0f9411c6caac8ae7308f" +checksum = "32e90c2accc4b07a8456ea0debdc2e7587bdd890680d71173a15d4ae604f6eef" dependencies = [ "log", "regex", @@ -381,11 +357,11 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.11.9" +version = "0.11.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2daee4ea451f429a58296525ddf28b45a3b64f1acf6587e2067437bb11e218d" +checksum = "0621c04f2196ac3f488dd583365b9c09be011a4ab8b9f37248ffcc8f6198b56a" dependencies = [ - "anstream 0.6.21", + "anstream", "anstyle", "env_filter", "jiff", @@ -420,9 +396,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.3.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" [[package]] name = "find-msvc-tools" @@ -596,9 +572,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.16.1" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" [[package]] name = "heck" @@ -662,9 +638,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.8.1" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" +checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca" dependencies = [ "atomic-waker", "bytes", @@ -675,7 +651,6 @@ dependencies = [ "httparse", "itoa", "pin-project-lite", - "pin-utils", "smallvec", "tokio", "want", @@ -746,12 +721,13 @@ dependencies = [ [[package]] name = "icu_collections" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" +checksum = "2984d1cd16c883d7935b9e07e44071dca8d917fd52ecc02c04d5fa0b5a3f191c" dependencies = [ "displaydoc", "potential_utf", + "utf8_iter", "yoke", "zerofrom", "zerovec", @@ -759,9 +735,9 @@ dependencies = [ [[package]] name = "icu_locale_core" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" +checksum = "92219b62b3e2b4d88ac5119f8904c10f8f61bf7e95b640d25ba3075e6cac2c29" dependencies = [ "displaydoc", "litemap", @@ -772,9 +748,9 @@ dependencies = [ [[package]] name = "icu_normalizer" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" +checksum = "c56e5ee99d6e3d33bd91c5d85458b6005a22140021cc324cea84dd0e72cff3b4" dependencies = [ "icu_collections", "icu_normalizer_data", @@ -786,15 +762,15 @@ dependencies = [ [[package]] name = "icu_normalizer_data" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" +checksum = "da3be0ae77ea334f4da67c12f149704f19f81d1adf7c51cf482943e84a2bad38" [[package]] name = "icu_properties" -version = "2.1.2" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" +checksum = "bee3b67d0ea5c2cca5003417989af8996f8604e34fb9ddf96208a033901e70de" dependencies = [ "icu_collections", "icu_locale_core", @@ -806,15 +782,15 @@ dependencies = [ [[package]] name = "icu_properties_data" -version = "2.1.2" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" +checksum = "8e2bbb201e0c04f7b4b3e14382af113e17ba4f63e2c9d2ee626b720cbce54a14" [[package]] name = "icu_provider" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" +checksum = "139c4cf31c8b5f33d7e199446eff9c1e02decfc2f0eec2c8d71f65befa45b421" dependencies = [ "displaydoc", "icu_locale_core", @@ -860,12 +836,12 @@ checksum = "964de6e86d545b246d84badc0fef527924ace5134f30641c203ef52ba83f58d5" [[package]] name = "indexmap" -version = "2.13.0" +version = "2.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" +checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" dependencies = [ "equivalent", - "hashbrown 0.16.1", + "hashbrown 0.17.0", "serde", "serde_core", ] @@ -878,9 +854,9 @@ checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" [[package]] name = "iri-string" -version = "0.7.10" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a" +checksum = "25e659a4bb38e810ebc252e53b5814ff908a8c58c2a9ce2fae1bbec24cbf4e20" dependencies = [ "memchr", "serde", @@ -903,9 +879,9 @@ checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" [[package]] name = "itoa" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" [[package]] name = "jiff" @@ -943,10 +919,12 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.91" +version = "0.3.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c" +checksum = "2964e92d1d9dc3364cae4d718d93f227e3abb088e747d92e0395bfdedf1c12ca" dependencies = [ + "cfg-if", + "futures-util", "once_cell", "wasm-bindgen", ] @@ -959,9 +937,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.183" +version = "0.2.185" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" +checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" [[package]] name = "linux-raw-sys" @@ -971,9 +949,9 @@ checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" [[package]] name = "litemap" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" +checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" [[package]] name = "log" @@ -1008,9 +986,9 @@ checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] name = "mio" -version = "1.1.1" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" +checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" dependencies = [ "libc", "wasi", @@ -1069,9 +1047,9 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "openssl" -version = "0.10.76" +version = "0.10.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "951c002c75e16ea2c65b8c7e4d3d51d5530d8dfa7d060b4776828c88cfb18ecf" +checksum = "bfe4646e360ec77dff7dde40ed3d6c5fee52d156ef4a62f53973d38294dad87f" dependencies = [ "bitflags", "cfg-if", @@ -1101,9 +1079,9 @@ checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" [[package]] name = "openssl-sys" -version = "0.9.112" +version = "0.9.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57d55af3b3e226502be1526dfdba67ab0e9c96fc293004e79576b2b9edb0dbdb" +checksum = "ad2f2c0eba47118757e4c6d2bff2838f3e0523380021356e7875e858372ce644" dependencies = [ "cc", "libc", @@ -1150,17 +1128,11 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" -[[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - [[package]] name = "pkg-config" -version = "0.3.32" +version = "0.3.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" [[package]] name = "portable-atomic" @@ -1179,9 +1151,9 @@ dependencies = [ [[package]] name = "potential_utf" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77" +checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564" dependencies = [ "zerovec", ] @@ -1328,12 +1300,12 @@ dependencies = [ [[package]] name = "rtoolbox" -version = "0.0.3" +version = "0.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7cc970b249fbe527d6e02e0a227762c9108b2f49d81094fe357ffc6d14d7f6f" +checksum = "327b72899159dfae8060c51a1f6aebe955245bcd9cc4997eed0f623caea022e4" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1416,9 +1388,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" [[package]] name = "serde" @@ -1635,9 +1607,9 @@ dependencies = [ [[package]] name = "tinystr" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869" +checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d" dependencies = [ "displaydoc", "zerovec", @@ -1645,9 +1617,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.50.0" +version = "1.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" +checksum = "a91135f59b1cbf38c91e73cf3386fca9bb77915c45ce2771460c9d92f0f3d776" dependencies = [ "bytes", "libc", @@ -1661,9 +1633,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.6.1" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" +checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496" dependencies = [ "proc-macro2", "quote", @@ -1852,9 +1824,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.114" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e" +checksum = "0bf938a0bacb0469e83c1e148908bd7d5a6010354cf4fb73279b7447422e3a89" dependencies = [ "cfg-if", "once_cell", @@ -1865,23 +1837,19 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.64" +version = "0.4.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9c5522b3a28661442748e09d40924dfb9ca614b21c00d3fd135720e48b67db8" +checksum = "f371d383f2fb139252e0bfac3b81b265689bf45b6874af544ffa4c975ac1ebf8" dependencies = [ - "cfg-if", - "futures-util", "js-sys", - "once_cell", "wasm-bindgen", - "web-sys", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.114" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6" +checksum = "eeff24f84126c0ec2db7a449f0c2ec963c6a49efe0698c4242929da037ca28ed" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1889,9 +1857,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.114" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3" +checksum = "9d08065faf983b2b80a79fd87d8254c409281cf7de75fc4b773019824196c904" dependencies = [ "bumpalo", "proc-macro2", @@ -1902,9 +1870,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.114" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16" +checksum = "5fd04d9e306f1907bd13c6361b5c6bfc7b3b3c095ed3f8a9246390f8dbdee129" dependencies = [ "unicode-ident", ] @@ -1958,9 +1926,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.91" +version = "0.3.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "854ba17bb104abfb26ba36da9729addc7ce7f06f5c0f90f3c391f8461cca21f9" +checksum = "4f2dfbb17949fa2088e5d39408c48368947b86f7834484e87b73de55bc14d97d" dependencies = [ "js-sys", "wasm-bindgen", @@ -2047,15 +2015,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "windows-sys" -version = "0.52.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" -dependencies = [ - "windows-targets 0.52.6", -] - [[package]] name = "windows-sys" version = "0.59.0" @@ -2302,15 +2261,15 @@ dependencies = [ [[package]] name = "writeable" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" [[package]] name = "yoke" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954" +checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca" dependencies = [ "stable_deref_trait", "yoke-derive", @@ -2319,9 +2278,9 @@ dependencies = [ [[package]] name = "yoke-derive" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" +checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e" dependencies = [ "proc-macro2", "quote", @@ -2331,18 +2290,18 @@ dependencies = [ [[package]] name = "zerofrom" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df" dependencies = [ "zerofrom-derive", ] [[package]] name = "zerofrom-derive" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1" dependencies = [ "proc-macro2", "quote", @@ -2358,9 +2317,9 @@ checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" [[package]] name = "zerotrie" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851" +checksum = "0f9152d31db0792fa83f70fb2f83148effb5c1f5b8c7686c3459e361d9bc20bf" dependencies = [ "displaydoc", "yoke", @@ -2369,9 +2328,9 @@ dependencies = [ [[package]] name = "zerovec" -version = "0.11.5" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002" +checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239" dependencies = [ "yoke", "zerofrom", @@ -2380,9 +2339,9 @@ dependencies = [ [[package]] name = "zerovec-derive" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" +checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index a5c1ab0..0eb862c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ human-units = "0.5.3" log = "0.4.27" lz4 = "1.28.1" memchr = "2.8.0" -nix = { version = "0.31.2", features = ["process", "signal", "user"] } +nix = { version = "0.31.2", features = ["fs", "process", "signal", "user"] } openssl = "0.10.73" page_size = "0.6.0" reqwest = { version = "0.13.1", features = ["json", "stream", "native-tls", "socks"], default-features = false } @@ -41,5 +41,5 @@ serde_yaml = "0.9.34" signal-hook = "0.4.4" tabled = "0.20.0" thiserror = "2.0.12" -tokio = { version = "1.45.1", features = ["fs", "io-std", "macros", "process", "rt"] } +tokio = { version = "1.45.1", features = ["fs", "io-std", "macros", "process", "rt", "signal"] } diff --git a/src/bin/dkl.rs b/src/bin/dkl.rs index 046ac8b..28872c0 100644 --- a/src/bin/dkl.rs +++ b/src/bin/dkl.rs @@ -9,6 +9,8 @@ use tokio::fs; #[derive(Parser)] #[command()] struct Cli { + #[arg(long)] + log_to: Option, #[command(subcommand)] command: Command, } @@ -32,17 +34,17 @@ enum Command { Logger { /// Path where the logs are stored #[arg(long, short = 'p', default_value = "/var/log", env = "DKL_LOG_PATH")] - log_path: String, + log_path: PathBuf, /// Name of the log instead of the command's basename #[arg(long, short = 'n')] - log_name: Option, + log_name: Option, /// prefix log lines with time & stream #[arg(long)] with_prefix: bool, /// exec command in this cgroup #[arg(long)] cgroup: Option, - command: String, + command: PathBuf, args: Vec, }, Log { @@ -90,6 +92,11 @@ enum Command { #[command(subcommand)] cmd: CgCmd, }, + + Rc { + #[command(subcommand)] + cmd: RcCmd, + }, } #[derive(Subcommand)] @@ -104,16 +111,56 @@ enum CgCmd { }, } +#[derive(Subcommand)] +enum RcCmd { + Run, + Ls, + Status, + ReloadConfig, + Start { + #[arg(add = completions(dkl::rc::complete))] + key: String, + }, + Stop { + #[arg(add = completions(dkl::rc::complete))] + key: String, + }, + Reload { + #[arg(add = completions(dkl::rc::complete))] + key: String, + }, + Sig { + #[arg(add = completions(dkl::rc::complete))] + key: String, + signal: u32, + }, + Ctl { + args: Vec, + }, +} + #[tokio::main(flavor = "current_thread")] async fn main() -> Result<()> { clap_complete::CompleteEnv::with_factory(Cli::command).complete(); let cli = Cli::parse(); - env_logger::builder() - .parse_filters("info") - .parse_default_env() - .init(); + { + let mut builder = env_logger::builder(); + builder.parse_filters("info").parse_default_env(); + + if let Some(log_to) = cli.log_to { + builder.target(env_logger::Target::Pipe(Box::new( + std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(log_to) + .unwrap(), + ))); + } + + builder.init(); + } use Command as C; match cli.command { @@ -127,23 +174,24 @@ async fn main() -> Result<()> { apply_config(&config, &filters, &prefix, dry_run).await } C::Logger { - ref log_path, - ref log_name, + log_path, + log_name, with_prefix, cgroup, command, args, } => { - let command = command.as_str(); - let log_name = log_name.as_deref().unwrap_or_else(|| basename(command)); + let log_name = log_name.unwrap_or_else(|| command.file_prefix().unwrap().into()); - dkl::logger::Logger { + let logger = dkl::logger::Logger { log_path, log_name, with_prefix, - } - .run(cgroup, command, &args) - .await + cgroup, + }; + + let cmd = logger.setup(command, &args).await?; + logger.exec(cmd).await } C::Log { log_path, @@ -187,6 +235,20 @@ async fn main() -> Result<()> { cols, } => Ok(dkl::cgroup::ls(root, &exclude, cols.as_deref()).await?), }, + + C::Rc { cmd } => match cmd { + RcCmd::Run => Ok(dkl::rc::run().await?), + RcCmd::Ls => Ok(dkl::rc::ctl(["ls"]).await?), + RcCmd::Status => Ok(dkl::rc::ctl(["status"]).await?), + RcCmd::ReloadConfig => Ok(dkl::rc::ctl(["reload-config"]).await?), + RcCmd::Start { key } => Ok(dkl::rc::ctl(["start", &key]).await?), + RcCmd::Stop { key } => Ok(dkl::rc::ctl(["stop", &key]).await?), + RcCmd::Reload { key } => Ok(dkl::rc::ctl(["reload", &key]).await?), + RcCmd::Sig { key, signal } => { + Ok(dkl::rc::ctl(["sig", &key, &signal.to_string()]).await?) + } + RcCmd::Ctl { args } => Ok(dkl::rc::ctl(&args).await?), + }, } } @@ -298,10 +360,6 @@ fn parse_ts_arg(ts: Option) -> Result> { } } -fn basename(path: &str) -> &str { - path.rsplit_once('/').map_or(path, |split| split.1) -} - fn parse_globs(filters: &[String]) -> Result> { let mut errors = false; let filters = (filters.iter()) @@ -321,3 +379,25 @@ fn parse_globs(filters: &[String]) -> Result> { Ok(filters) } + +use clap_complete::{ArgValueCandidates, CompletionCandidate}; + +fn completions(f: impl AsyncFn() -> Vec + Send + Sync + 'static) -> ArgValueCandidates { + let f = std::sync::Arc::new(f); + + ArgValueCandidates::new(move || { + let f = f.clone(); + std::thread::spawn(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async move { f().await }) + }) + .join() + .into_iter() + .flatten() + .map(CompletionCandidate::new) + .collect::>() + }) +} diff --git a/src/logger.rs b/src/logger.rs index 16e0742..ef9e9c6 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -2,12 +2,13 @@ use async_compression::tokio::write::{ZstdDecoder, ZstdEncoder}; use chrono::{DurationRound, TimeDelta, Utc}; use eyre::{format_err, Result}; use log::{debug, error, warn}; +use std::ffi::OsStr; use std::path::{Path, PathBuf}; use std::process::Stdio; use tokio::{ fs::File, io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}, - process, + process::{Child, Command}, sync::mpsc, time::{sleep, Duration}, }; @@ -22,67 +23,92 @@ const TRUNC_DELTA: TimeDelta = TimeDelta::hours(1); const FLUSH_INTERVAL: Duration = Duration::from_secs(1); const WRITE_RETRY_DELAY: Duration = Duration::from_secs(1); -pub struct Logger<'t> { - pub log_path: &'t str, - pub log_name: &'t str, +pub struct Logger { + pub log_path: PathBuf, + pub log_name: PathBuf, pub with_prefix: bool, + pub cgroup: Option, } -impl<'t> Logger<'t> { - pub async fn run(&self, cgroup: Option, command: &str, args: &[String]) -> Result<()> { +impl Logger { + pub async fn setup(&self, command: impl AsRef, args: I) -> fs::Result + where + I: IntoIterator, + S: AsRef, + { // make sure we can at least open the log before starting the command - let archives_path = &format!("{path}/archives", path = self.log_path); - (fs::create_dir_all(archives_path).await) - .map_err(|e| format_err!("failed to create archives dir: {e}"))?; - let archives_read_dir = (fs::read_dir(archives_path).await) - .map_err(|e| format_err!("failed to list archives: {e}"))?; + let archives_path = &self.log_path.join("archives"); + fs::create_dir_all(archives_path).await?; - let mut prev_stamp = trunc_ts(Utc::now()); - let mut current_log = BufWriter::new(self.open_log(prev_stamp).await?); + let archives_read_dir = fs::read_dir(archives_path).await?; + + let prev_stamp = trunc_ts(Utc::now()); tokio::spawn(compress_archives( archives_read_dir, - self.log_name.to_string(), + self.log_name.clone(), prev_stamp.format(TS_FORMAT).to_string(), )); - // start the command - let mut cmd = process::Command::new(command); - cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped()); - if let Some(cgroup) = cgroup.as_deref() { - let mut cg_path = PathBuf::from(cgroup::ROOT); - cg_path.push(cgroup); - cg_path.push(self.log_name); + // create the command + let mut cmd = Command::new(command); - use std::io::ErrorKind as K; - match tokio::fs::create_dir(&cg_path).await { - Ok(_) => debug!("created dir {}", cg_path.display()), - Err(e) if e.kind() == K::AlreadyExists => { - debug!("existing dir {}", cg_path.display()) - } - Err(e) => return Err(fs::Error::CreateDir(cg_path, e).into()), - } + cmd.args(args); + + if let Some(cgroup) = self.cgroup.as_deref() { + let cg_path = PathBuf::from(cgroup::ROOT) + .join(cgroup) + .join(&self.log_name); + + fs::create_dir_all(&cg_path).await?; let procs_file = cg_path.join("cgroup.procs"); debug!("procs file {}", procs_file.display()); - fs::write(&procs_file, b"0").await?; + + unsafe { cmd.pre_exec(move || std::fs::write(&procs_file, b"0")) }; } - let mut child = cmd.spawn().map_err(|e| format_err!("exec failed: {e}"))?; + Ok(cmd) + } - let (tx, mut rx) = mpsc::channel(8); + pub fn spawn(self, mut cmd: Command) -> std::io::Result { + // setup outputs for capture + cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); + + // spawn + let mut child = cmd.spawn()?; + + // capture outputs + let (tx, rx) = mpsc::channel(8); tokio::spawn(copy("stdout", child.stdout.take().unwrap(), tx.clone())); tokio::spawn(copy("stderr", child.stderr.take().unwrap(), tx)); + // log outputs + tokio::spawn(self.log_stream(rx)); + + Ok(child) + } + + // TODO: Result when stable + pub async fn exec(self, cmd: Command) -> Result<()> { + let mut child = self.spawn(cmd)?; + // forward signals if let Some(child_pid) = child.id() { forward_signals_to(child_pid as i32); } - // handle output + let status = child.wait().await?; + std::process::exit(status.code().unwrap_or(-1)); + } + + async fn log_stream(self, mut rx: mpsc::Receiver) { let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL); + let mut prev_stamp = trunc_ts(Utc::now()); + let mut current_log = BufWriter::new(self.eventually_open_log(prev_stamp).await); + loop { tokio::select!( r = rx.recv() => { @@ -102,15 +128,11 @@ impl<'t> Logger<'t> { ); } - let status = child.wait().await?; - // finalize - while let Err(e) = current_log.flush().await { - error!("final log flush failed: {e}"); + while let Err(e) = current_log.shutdown().await { + error!("final log shutdown failed: {e}"); sleep(WRITE_RETRY_DELAY).await; } - - std::process::exit(status.code().unwrap_or(-1)); } async fn log_item( @@ -143,6 +165,18 @@ impl<'t> Logger<'t> { Ok(()) } + async fn eventually_open_log(&self, ts: Timestamp) -> File { + loop { + match self.open_log(ts).await { + Ok(log) => break log, + Err(e) => { + error!("open log failed: {e}"); + sleep(WRITE_RETRY_DELAY).await; + } + } + } + } + async fn open_log(&self, ts: Timestamp) -> Result { let log_file = &self.archive_path(ts); @@ -153,8 +187,9 @@ impl<'t> Logger<'t> { .open(log_file) .await?; - let link_src = &PathBuf::from(self.log_path) - .join(self.log_name) + let link_src = &self + .log_path + .join(&self.log_name) .with_added_extension("log"); let link_tgt = &self.archive_rel_path(ts); @@ -171,17 +206,15 @@ impl<'t> Logger<'t> { } fn archive_path(&self, ts: Timestamp) -> PathBuf { - PathBuf::from(self.log_path).join(self.archive_rel_path(ts)) + self.log_path.join(self.archive_rel_path(ts)) } fn archive_rel_path(&self, ts: Timestamp) -> PathBuf { PathBuf::from("archives").join(self.archive_file(ts)) } - fn archive_file(&self, ts: Timestamp) -> String { - format!( - "{name}.{ts}.log", - name = self.log_name, - ts = ts.format(TS_FORMAT), - ) + fn archive_file(&self, ts: Timestamp) -> PathBuf { + self.log_name + .with_added_extension(ts.format(TS_FORMAT).to_string()) + .with_added_extension("log") } } @@ -266,7 +299,7 @@ async fn copy(stream_name: &'static str, out: impl AsyncRead + Unpin, tx: mpsc:: line.extend_from_slice(buf); out.consume(len); send_line!(); - } else if buf.len() > remaining { + } else if buf.len() >= remaining { line.extend_from_slice(&buf[..remaining]); out.consume(remaining); send_line!(); @@ -287,7 +320,12 @@ pub fn trunc_ts(ts: Timestamp) -> Timestamp { .expect("duration_trunc failed") } -async fn compress_archives(mut read_dir: fs::ReadDir, log_name: String, exclude_ts: String) { +async fn compress_archives( + mut read_dir: fs::ReadDir, + log_name: impl AsRef, + exclude_ts: String, +) { + let log_name = log_name.as_ref(); loop { let Ok(Some(entry)) = (read_dir.next_entry().await).inspect_err(|e| error!("archive dir read failed: {e}")) diff --git a/src/rc.rs b/src/rc.rs index 8b13789..54b1631 100644 --- a/src/rc.rs +++ b/src/rc.rs @@ -1 +1,495 @@ +use eyre::format_err; +use log::{error, info, warn}; +use nix::sys::signal::Signal; +use std::collections::{BTreeMap as Map, BTreeSet as Set}; +use std::path::PathBuf; +use std::sync::LazyLock; +use tokio::{ + io::{copy, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}, + net::{UnixListener, UnixStream}, + sync::{mpsc, watch, RwLock}, +}; +use crate::{cgroup, fs}; + +mod runner; + +use runner::{Child, State}; + +const CFG_PATH: &str = "/etc/direktil/rc.yaml"; +const SOCK_PATH: &str = "/run/dkl-rc/ctl.sock"; // Path::new when stable + +#[derive(Default, serde::Serialize, serde::Deserialize)] +pub struct Config { + #[serde(default, skip_serializing_if = "Map::is_empty")] + pub cgroups: Map, +} + +#[derive(serde::Serialize, serde::Deserialize)] +pub struct CgroupConfig { + pub controllers: String, + #[serde(default, skip_serializing_if = "Map::is_empty")] + pub settings: Map, + #[serde(default, skip_serializing_if = "Map::is_empty")] + pub services: Map, +} + +pub type Service = Vec; + +static MANAGER: LazyLock> = LazyLock::new(|| RwLock::new(Manager::default())); + +type Result = std::result::Result; + +#[derive(Debug, thiserror::Error)] +enum Error { + #[error("invalid command: {0:?}")] + InvalidCommand(String), + #[error("config read failed: {0}")] + ConfigRead(fs::Error), + #[error("config parse failed: {0}")] + ConfigParse(serde_yaml::Error), + #[error("cgroup setup failed: {0}")] + CgroupSetup(fs::Error), + #[error("invalid key (cgroup/service)")] + InvalidKey, + #[error("unknown cgroup: {0:?}")] + UnknownCgroup(String), + #[error("unknown service: {0:?}")] + UnknownService(String), + #[error("invalid signal: {0:?}")] + InvalidSignal(String), + #[error("process exited")] + ProcessExited, + #[error("nothing running under {0:?}")] + NotRunning(String), + #[error("kill failed: {0:?}")] + KillFailed(nix::Error), + #[error("service runner is dead")] + RunnerDead, +} + +pub async fn run() -> eyre::Result<()> { + info!("starting"); + + tokio::spawn(wait_terminate()); + + let _ = reload_config().await; + tokio::spawn(wait_reload()); + + if let Some(sock_dir) = PathBuf::from(SOCK_PATH).parent() { + let _ = tokio::fs::DirBuilder::new() + .mode(0o700) + .create(sock_dir) + .await; + } + + let _ = tokio::fs::remove_file(SOCK_PATH).await; + let listener = UnixListener::bind(SOCK_PATH)?; + + loop { + let Ok((conn, _)) = listener.accept().await else { + warn!("listener closed"); + break; + }; + + tokio::spawn(async move { handle(conn).await }); + } + + cleanup().await; + Ok(()) +} + +async fn cleanup() { + let _ = tokio::fs::remove_file(SOCK_PATH).await; +} + +pub async fn ctl(args: I) -> eyre::Result<()> +where + I: IntoIterator, + S: Into, +{ + let args: Vec<_> = args.into_iter().map(|s| s.into()).collect(); + let args = format!("{}\n", args.join(" ")); + + match ctl_exec(args.as_bytes()).await { + Ok(mut rd) => { + copy(&mut rd, &mut tokio::io::stdout()).await?; + std::process::exit(0); + } + Err(e) => { + eprint!("{e}"); + std::process::exit(1); + } + } +} + +async fn ctl_exec(request: &[u8]) -> eyre::Result> { + let mut conn = UnixStream::connect(SOCK_PATH) + .await + .map_err(|e| format_err!("{SOCK_PATH}: {e}"))?; + + conn.write_all(request).await?; + + let mut rd = BufReader::with_capacity(64, conn); + + let mut code = String::new(); + rd.read_line(&mut code).await?; + let code: i32 = code.trim_ascii().parse()?; + + if code != 0 { + let mut err = String::new(); + rd.read_to_string(&mut err).await?; + return Err(format_err!("{}", err.trim_ascii_end())); + } + + Ok(rd) +} + +async fn handle(mut conn: UnixStream) { + let (rd, mut wr) = conn.split(); + let mut rd = BufReader::with_capacity(64, rd).lines(); + + let Ok(Some(line)) = rd.next_line().await else { + return; + }; + + let mut line = line.split_ascii_whitespace(); + + macro_rules! next { + () => {{ + match line.next() { + Some(v) => v, + None => return, + } + }}; + } + + let r = match next!() { + "ls" => Ok(Some(ls().await)), + "status" => Ok(Some(status().await)), + "reload-config" => reload_config().await.map(|_| None), + "start" => start(next!()).await.map(|_| None), + "stop" => stop(next!()).await.map(|_| None), + "reload" => reload(next!()).await.map(|_| None), + "sig" => sig(next!(), next!()).await.map(|_| None), + cmd => Err(Error::InvalidCommand(cmd.into())), + }; + + let _ = match r { + Ok(None) => wr.write_all(b"0\n").await, + Ok(Some(s)) => wr.write_all(format!("0\n{s}\n").as_bytes()).await, + Err(e) => wr.write_all(format!("1\n{e}\n").as_bytes()).await, + }; + + let _ = wr.shutdown().await; +} + +async fn wait_terminate() { + use tokio::signal::unix::{signal, SignalKind}; + let Ok(mut sig) = signal(SignalKind::terminate()) + .inspect_err(|e| error!("failed to listen to SIGTERM (will be ignored): {e}")) + else { + return; + }; + + sig.recv().await; + + info!("SIGTERM received, terminating"); + MANAGER.write().await.terminate().await; + + cleanup().await; + log::logger().flush(); + std::process::exit(0); +} + +async fn wait_reload() { + use tokio::signal::unix::{signal, SignalKind}; + let Ok(mut sig) = signal(SignalKind::hangup()) + .inspect_err(|e| error!("failed to listen to SIGHUP (will be ignored): {e}")) + else { + return; + }; + + loop { + sig.recv().await; + let _ = reload_config().await; + } +} + +async fn reload_config() -> Result<()> { + let cfg = (fs::read(CFG_PATH).await) + .map_err(Error::ConfigRead) + .inspect_err(|e| error!("{e}"))?; + + let cfg = serde_yaml::from_slice::(&cfg) + .map_err(Error::ConfigParse) + .inspect_err(|e| error!("{CFG_PATH}: {e}"))?; + + info!("applying new config"); + let r = MANAGER.write().await.apply_config(cfg).await; + match &r { + Ok(_) => info!("applied new config"), + Err(e) => info!("failed to apply new config: {e}"), + } + r +} + +async fn ls() -> String { + let mut keys = String::new(); + for (i, k) in MANAGER.read().await.runners.keys().enumerate() { + if i != 0 { + keys.push('\n'); + } + keys.push_str(k); + } + keys +} + +async fn status() -> String { + let status = MANAGER.read().await.status(); + + let mut table = tabled::builder::Builder::new(); + table.push_record(["cgroup", "service", "PID", "state", "msg"]); + + for (cg_svc, child) in status { + let (cg, svc) = cg_svc.split_once('/').unwrap(); + let pid = child.pid.map(|p| p.to_string()); + table.push_record([ + cg, + svc, + pid.as_deref().unwrap_or("◌"), + &format!("{:?}", child.state), + child.msg.as_deref().unwrap_or("◌"), + ]); + } + + (table.build()) + .with(tabled::settings::Style::psql()) + .to_string() +} + +async fn start(key: &str) -> Result<()> { + MANAGER.write().await.start(key).await +} + +async fn stop(key: &str) -> Result<()> { + MANAGER.write().await.stop(key).await +} + +async fn reload(key: &str) -> Result<()> { + MANAGER.read().await.reload(key).await +} + +async fn sig(key: &str, sig: &str) -> Result<()> { + let sig: Signal = sig.parse().map_err(|_| Error::InvalidSignal(sig.into()))?; + signal(key, sig).await +} + +async fn child_for(key: &str) -> Result { + MANAGER.read().await.child_for(key) +} + +async fn signal(key: &str, sig: Signal) -> Result<()> { + child_for(key).await?.kill(sig) +} + +fn child_key(cg: &str, svc: &str) -> String { + [cg, svc].join("/") +} +fn split_key(key: &str) -> Result<(&str, &str)> { + key.split_once('/').ok_or(Error::InvalidKey) +} + +#[derive(Default)] +struct Manager { + cfg: Config, + procs: Map>, + runners: Map>, +} + +impl Manager { + fn status(&self) -> Vec<(String, Child)> { + (self.procs.iter()) + .map(|(n, c)| (n.clone(), c.borrow().clone())) + .collect() + } + + fn child_for(&self, key: &str) -> Result { + (self.procs.get(key)) + .map(|c| c.borrow().clone()) + .ok_or_else(|| Error::NotRunning(key.into())) + } + + async fn apply_config(&mut self, new_cfg: Config) -> Result<()> { + // create and configure cgroups + for (name, cg) in &new_cfg.cgroups { + let cg_path = PathBuf::from(cgroup::ROOT).join(name); + fs::create_dir_all(&cg_path) + .await + .map_err(Error::CgroupSetup)?; + + fs::write( + cg_path.join("cgroup.subtree_control"), + cg.controllers.as_bytes(), + ) + .await + .map_err(Error::CgroupSetup)?; + + for (setting, value) in &cg.settings { + fs::write(cg_path.join(setting), value.as_bytes()) + .await + .map_err(Error::CgroupSetup)?; + } + } + + let new_svcs: Set<_> = new_cfg.service_keys().collect(); + + // stop removed services + let to_stop = Map::from_iter(self.runners.extract_if(.., |k, _| !new_svcs.contains(k))); + let mut stopped = Set::new(); + for (key, runner_cmd) in to_stop { + if runner_cmd.send(runner::Cmd::Stop).await.is_err() { + // runner already dead + continue; + } + stopped.insert(key); + } + + // start added services + for (key, cg, svc, service) in new_cfg.services() { + if self.runners.contains_key(&key) { + continue; + }; + + let cmd = self.spawn_runner(key, cg, svc, service.clone()); + if let Err(e) = cmd.send(runner::Cmd::Start).await { + error!("runner instantly died: {e}"); + } + } + + // wait & cleanup stopped + for key in stopped { + let Some(mut child_rx) = self.procs.remove(&key) else { + continue; + }; + let _ = child_rx + .wait_for(|c| matches!(c.state, State::Finalized)) + .await; + } + + self.cfg = new_cfg; + Ok(()) + } + + async fn terminate(&mut self) { + self.runners.clear(); + + for child in self.procs.values_mut() { + let _ = child + .wait_for(|c| matches!(c.state, State::Finalized)) + .await; + } + + self.procs.clear(); + } + + fn runner(&mut self, key: &str) -> Result> { + if let Some(c) = self.runners.get(key) { + return Ok(c.clone()); + } + + let (cg, svc) = split_key(key)?; + let service = self.cfg.service(key)?; + + Ok(self.spawn_runner(key.into(), cg, svc, service.clone())) + } + + fn spawn_runner( + &mut self, + key: String, + cg: &str, + svc: &str, + service: Service, + ) -> mpsc::Sender { + let (runner, child_rx, cmds_tx) = runner::new(cg, svc, service); + + tokio::spawn(runner.run()); + + self.procs.insert(key.clone(), child_rx); + self.runners.insert(key, cmds_tx.clone()); + + cmds_tx + } + + async fn cmd(&mut self, key: &str, cmd: runner::Cmd) -> Result<()> { + if self.runner(key)?.send(cmd).await.is_err() { + // runner died + self.runners.remove(key); + return Err(Error::RunnerDead); + } + Ok(()) + } + + async fn start(&mut self, key: &str) -> Result<()> { + self.cmd(key, runner::Cmd::Start).await + } + + async fn stop(&mut self, key: &str) -> Result<()> { + self.cmd(key, runner::Cmd::Stop).await + } + + async fn reload(&self, key: &str) -> Result<()> { + let proc = (self.procs.get(key)) // + .ok_or_else(|| Error::UnknownService(key.into()))?; + proc.borrow().reload() + } +} + +impl Config { + fn cgroup(&self, cg: &str) -> Result<&CgroupConfig> { + self.cgroups + .get(cg) + .ok_or_else(|| Error::UnknownCgroup(cg.into())) + } + + fn service(&self, key: &str) -> Result<&Service> { + let (cg, svc) = split_key(key)?; + self.cgroup(cg)?.service(svc) + } + + fn service_keys(&self) -> impl Iterator { + (self.cgroups.iter()) + .map(|(cg_name, cg)| cg.services.keys().map(move |n| child_key(cg_name, n))) + .flatten() + } + + fn services(&self) -> impl Iterator { + (self.cgroups.iter()) + .map(|(cg_name, cg)| { + cg.services + .iter() + .map(move |(n, service)| (child_key(cg_name, n), cg_name, n, service)) + }) + .flatten() + } +} + +impl CgroupConfig { + fn service(&self, svc: &str) -> Result<&Vec> { + self.services + .get(svc) + .ok_or_else(|| Error::UnknownService(svc.into())) + } +} + +pub async fn complete() -> Vec { + let mut r = vec![]; + let Ok(rd) = ctl_exec(b"ls\n").await else { + return r; + }; + let mut rd = rd.lines(); + while let Some(line) = rd.next_line().await.ok().flatten() { + r.push(line); + } + r +} diff --git a/src/rc/runner.rs b/src/rc/runner.rs new file mode 100644 index 0000000..8851f78 --- /dev/null +++ b/src/rc/runner.rs @@ -0,0 +1,263 @@ +use log::{error, warn}; +use nix::{ + sys::signal::{kill, Signal}, + unistd::Pid, +}; +use std::num::NonZero; +use tokio::{ + process, select, + sync::{mpsc, watch}, + time::{sleep, sleep_until, Duration, Instant}, +}; + +use super::{Error, Result, Service}; +use crate::logger::Logger; + +const LOG_PATH: &str = "/var/log"; + +const TERM_DELAY: Duration = Duration::from_secs(30); +const KILL_DELAY: Duration = Duration::from_secs(10); +const RESTART_DELAY: Duration = Duration::from_secs(8); + +#[derive(Debug, Clone, Copy)] +pub enum Cmd { + Start, + Stop, +} + +#[derive(Default, Clone, Copy, Debug)] +pub enum State { + #[default] + NeverStarted, + Starting, + Running, + Crashed, + Stopping, + Stopped, + Finalized, +} + +pub fn new( + cg: impl Into, + svc: impl Into, + service: Service, +) -> (Runner, watch::Receiver, mpsc::Sender) { + let (manager, child_rx) = ProcessManager::new(service); + let (cmds_tx, cmds_rx) = mpsc::channel(1); + let r = Runner { + cg: cg.into(), + svc: svc.into(), + cmds_rx, + manager, + }; + (r, child_rx, cmds_tx) +} + +pub struct Runner { + cg: String, + svc: String, + cmds_rx: mpsc::Receiver, + manager: ProcessManager, +} + +impl Runner { + pub async fn run(mut self) { + self.manager.update(State::NeverStarted); + + loop { + let cmd = select! { + cmd = self.manager.manage() => { + cmd + } + cmd = self.cmds_rx.recv() => { + let Some(cmd) = cmd else { + break; // command side dropped + }; + Some(cmd) + } + }; + + if let Some(cmd) = cmd { + self.process_cmd(cmd).await; + } + } + + self.process_cmd(Cmd::Stop).await; + self.manager.update(State::Finalized); + } + + async fn process_cmd(&mut self, cmd: Cmd) { + let cg = &self.cg; + let svc = &self.svc; + + match cmd { + Cmd::Start => { + self.manager.start(cg, svc).await; + } + Cmd::Stop => { + self.manager.stop().await; + } + } + } +} + +struct ProcessManager { + service: Service, + child_tx: watch::Sender, + process: Option, + restart_deadline: Option, +} + +impl ProcessManager { + fn new(service: Service) -> (Self, watch::Receiver) { + let (child_tx, child_rx) = watch::channel(Child::default()); + let pm = Self { + service, + child_tx, + process: None, + restart_deadline: None, + }; + (pm, child_rx) + } + + /// runs a management iteration (ie: waiting for the child or a restart deadline). + async fn manage(&mut self) -> Option { + if let Some(process) = self.process.as_mut() { + let msg = match process.wait().await { + Ok(status) => status.to_string(), + Err(e) => e.to_string(), + }; + self.crashed(msg); + self.process = None; + self.restart_deadline = Some(Instant::now() + RESTART_DELAY); + None + } else if let Some(deadline) = self.restart_deadline { + sleep_until(deadline).await; + Some(Cmd::Start) + } else { + std::future::pending().await + } + } + + async fn start(&mut self, cg: &str, svc: &str) { + if self.process.is_some() { + return; + } + + self.update(State::Starting); + + let logger = Logger { + log_path: LOG_PATH.into(), + log_name: svc.into(), + with_prefix: false, + cgroup: Some(cg.into()), + }; + + let mut args = self.service.iter(); + let Some(cmd) = args.next() else { + error!("{cg}/{svc}: empty command"); + return; + }; + + let Ok(cmd) = (logger.setup(cmd, args).await) + .inspect_err(|e| self.crashed(format!("setup failed: {e}"))) + else { + return; + }; + + let Ok(child) = logger + .spawn(cmd) + .inspect_err(|e| self.crashed(format!("exec failed: {e}"))) + else { + return; + }; + + self.process = Some(child); + self.restart_deadline = None; + self.update(State::Running); + } + + async fn stop(&mut self) { + self.restart_deadline = None; + + let Some(mut process) = self.process.take() else { + return; + }; + let Some(pid) = process.id() else { + let _ = process.wait().await; // already dead, reap it + self.update(State::Stopped); + return; + }; + + let pid = pid as i32; + self.update_full(pid, State::Stopping, None); + let pid = Pid::from_raw(pid); + + let _ = kill(pid, Signal::SIGTERM).inspect_err(|e| error!("kill -TERM {pid} failed: {e}")); + + select! { + _ = process.wait() => { + self.update(State::Stopped); + return + }, + _ = sleep(TERM_DELAY) => { + warn!("process {pid} did not exit during the grace period, killing"); + let _ = process.kill().await.inspect_err(|e| error!("kill -KILL {pid} failed: {e}")); + } + } + + select! { + _ = process.wait() => { + self.update(State::Stopped); + return + }, + _ = sleep(KILL_DELAY) => { + error!("process {pid} still alive after SIGKILL"); + } + } + } + + fn process_pid(&self) -> i32 { + (self.process.as_ref()) + .and_then(|c| Some(c.id()? as i32)) + .unwrap_or(0) + } + + fn update(&self, state: State) { + let pid = self.process_pid(); + self.update_full(pid, state, None); + } + + fn update_full(&self, pid: i32, state: State, msg: Option) { + self.child_tx.send_replace(Child { + pid: NonZero::new(pid), + state, + msg, + }); + } + + fn crashed(&self, msg: String) { + let pid = self.process_pid(); + self.update_full(pid, State::Crashed, Some(msg)); + } +} + +#[derive(Clone, Default)] +pub struct Child { + pub pid: Option>, + pub state: State, + pub msg: Option, +} + +impl Child { + pub fn reload(&self) -> Result<()> { + self.kill(Signal::SIGHUP) + } + + pub fn kill(&self, sig: Signal) -> Result<()> { + let Some(pid) = self.pid else { + return Err(Error::ProcessExited); + }; + kill(Pid::from_raw(pid.into()), sig).map_err(|e| Error::KillFailed(e)) + } +}