first commit

This commit is contained in:
Beyhan Oğur
2026-04-26 22:29:38 +03:00
commit 427856cd3a
176 changed files with 27613 additions and 0 deletions

32
crates/svc/Cargo.toml Normal file
View File

@@ -0,0 +1,32 @@
[package]
name = "svc"
version = "0.1.0"
edition.workspace = true
authors.workspace = true
rust-version.workspace = true
description.workspace = true
readme.workspace = true
repository.workspace = true
license.workspace = true
categories.workspace = true
keywords.workspace = true
[dependencies]
async-trait = "0.1.86"
futures = { version = "0.3.31", default-features = false, features = ["alloc"] }
log = { version = "0.4.26", optional = true }
serde = { version = "1.0.218", features = ["derive"] }
thiserror = "2.0.11"
tokio = { version = "1.43.0", features = ["io-util", "macros", "process", "rt", "rt-multi-thread", "sync", "time", "tokio-macros"], optional = true }
uuid = { version = "1.14.0", features = [] }
[features]
default = ["manager"]
manager = ["dep:log", "dep:tokio", "uuid/v4"]
[lints]
workspace = true
[dev-dependencies]
pretty_env_logger = "0.5.0"

View File

@@ -0,0 +1,50 @@
use std::time::Duration;
use thiserror::Error;
use tokio::time::sleep;
use svc::error::SvcResult;
use svc::manager::ServiceManager;
#[derive(Error, Debug)]
pub enum SimpleError {
#[error("That didn't work")]
Nope,
}
async fn run() -> Result<(), SimpleError> {
let dur = Duration::from_millis(800);
println!("Hello");
println!("1");
sleep(dur).await;
println!("2");
sleep(dur).await;
println!("3");
Ok(())
}
#[tokio::main]
async fn main() -> SvcResult<()> {
pretty_env_logger::formatted_builder()
.filter_level(log::LevelFilter::Debug)
.parse_default_env()
.init();
let (mut client, future) = ServiceManager::spawn();
client.register_function("foo", run()).await?;
client.start("foo").await?;
println!("main: service configured");
client.wait_for_start("foo").await?;
println!("main: service started");
client.shutdown().await?;
future.await??;
println!("main: service stopped");
Ok(())
}

View File

@@ -0,0 +1,70 @@
use std::time::Duration;
use async_trait::async_trait;
use svc::policy::{Policy, Retry};
use svc::runservice::StandardService;
use thiserror::Error;
use svc::error::SvcResult;
use svc::manager::ServiceManager;
use svc::traits::{Service, ServiceState};
#[derive(Clone)]
struct PolicyService {
counter: u32,
}
#[derive(Error, Debug)]
pub enum Error {
#[error("Not done yet")]
MoreToDo,
}
#[async_trait]
impl Service for PolicyService {
type Error = Error;
async fn run(&mut self) -> Result<(), Error> {
println!("Hello {}", self.counter);
self.counter += 1;
// returning an Err will invoke the policy for the Running state
Err(Error::MoreToDo)
}
}
#[tokio::main]
async fn main() -> SvcResult<()> {
const NAME: &str = "policy-service";
pretty_env_logger::formatted_builder()
.filter_level(log::LevelFilter::Debug)
.parse_default_env()
.init();
let (mut client, future) = ServiceManager::spawn();
let svc = PolicyService { counter: 0 };
// Manually construct a ServiceRunner, and set a specific policy for
// handling errors during .run()
let svcr = StandardService::new(NAME, svc).with_run_policy(
// Try up to 5 times, waiting 300ms between each attempt
Policy::new()
.with_retry(Retry::Limit(5))
.with_delay(Duration::from_millis(300)),
);
let uuid = client.register(NAME, svcr).await?;
client.start(uuid).await?;
println!("main: service will attempt to run 5 times");
client.wait_for_start(uuid).await?;
println!("main: service started");
client.wait_for_state(uuid, ServiceState::Failed).await?;
client.shutdown().await?;
future.await??;
Ok(())
}

View File

@@ -0,0 +1,56 @@
use std::time::Duration;
use tokio::time::sleep;
use svc::error::{RunSvcError, SvcResult};
use svc::manager::ServiceManager;
async fn run() -> Result<(), RunSvcError> {
let dur = Duration::from_millis(200);
println!("Hello");
let mut counter = 0;
loop {
println!("{counter}");
sleep(dur).await;
counter += 1;
}
}
#[tokio::main]
async fn main() -> SvcResult<()> {
pretty_env_logger::formatted_builder()
.filter_level(log::LevelFilter::Debug)
.parse_default_env()
.init();
let (mut client, future) = ServiceManager::spawn();
client.register_function("foo", run()).await?;
client.start("foo").await?;
println!("main: service configured");
client.wait_for_start("foo").await?;
println!("main: service started");
sleep(Duration::from_millis(1000)).await;
client.stop("foo").await?;
client.wait_for_stop("foo").await?;
println!("main: service stopped");
client.start("foo").await?;
client.wait_for_start("foo").await?;
println!("main: service started");
sleep(Duration::from_millis(1000)).await;
client.shutdown().await?;
future.await??;
Ok(())
}

View File

@@ -0,0 +1,86 @@
use std::time::Duration;
use async_trait::async_trait;
use thiserror::Error;
use tokio::time::sleep;
use svc::error::SvcResult;
use svc::manager::ServiceManager;
use svc::traits::Service;
#[derive(Clone)]
struct Simple {
name: String,
counter: u32,
}
#[derive(Error, Debug)]
pub enum SimpleError {
#[error("That didn't work..")]
Nope,
}
#[async_trait]
impl Service for Simple {
type Error = SimpleError;
async fn run(&mut self) -> Result<(), SimpleError> {
let dur = Duration::from_millis(300);
println!("Hello from {}", self.name);
println!("1");
sleep(dur).await;
println!("2");
sleep(dur).await;
println!("3");
println!("Done running. Now going to stop (this will fail the first time)");
Ok(())
}
async fn stop(&mut self) -> Result<(), SimpleError> {
self.counter += 1;
// pretend this service doesn't succeed at stopping right away
if self.counter > 1 {
Ok(())
} else {
Err(SimpleError::Nope)
}
}
}
#[tokio::main]
async fn main() -> SvcResult<()> {
pretty_env_logger::formatted_builder()
.filter_level(log::LevelFilter::Debug)
.parse_default_env()
.init();
let (mut client, future) = ServiceManager::spawn();
let svc = Simple {
name: "Simple Service".to_string(),
counter: 0,
};
client.register_service("foo", svc).await?;
client.start("foo").await?;
println!("main: service configured");
client.wait_for_start("foo").await?;
println!("main: service started");
client.wait_for_stop("foo").await?;
println!("main: service stopped");
client.shutdown().await?;
future.await??;
Ok(())
}

84
crates/svc/src/error.rs Normal file
View File

@@ -0,0 +1,84 @@
use std::error::Error;
use thiserror::Error;
use crate::manager::{ServiceEvent, SvmRequest};
use crate::serviceid::{ServiceId, ServiceName};
use crate::traits::ServiceState;
#[derive(Error, Debug)]
pub enum SvcError {
/* mapped errors */
#[error(transparent)]
FromUtf8Error(#[from] std::string::FromUtf8Error),
#[error(transparent)]
IOError(#[from] std::io::Error),
#[error(transparent)]
TryFromIntError(#[from] std::num::TryFromIntError),
#[error(transparent)]
UuidError(#[from] uuid::Error),
#[error(transparent)]
JoinError(#[from] tokio::task::JoinError),
#[error(transparent)]
MpscSendError(#[from] tokio::sync::mpsc::error::SendError<SvmRequest>),
#[error(transparent)]
MpscSendEventError(#[from] tokio::sync::mpsc::error::SendError<ServiceEvent>),
#[error(transparent)]
WatchSendError(#[from] tokio::sync::watch::error::SendError<ServiceState>),
#[error(transparent)]
WatchRecvError(#[from] tokio::sync::watch::error::RecvError),
#[error(transparent)]
OneshotRecvError(#[from] tokio::sync::oneshot::error::RecvError),
#[error("Service {0:?} not registered")]
ServiceNotFound(ServiceId),
#[error("Service {0} already exists")]
ServiceAlreadyExists(ServiceName),
#[error("All services stopped")]
Shutdown,
#[error("Service has failed")]
ServiceFailed,
#[error("Templated service generation failed")]
ServiceGeneration(Box<dyn Error + Send>),
}
impl SvcError {
pub fn generation(err: impl Error + Send + 'static) -> Self {
Self::ServiceGeneration(Box::new(err))
}
}
#[derive(Error, Debug)]
pub enum RunSvcError {
/* mapped errors */
#[error(transparent)]
MpscSendError(#[from] tokio::sync::mpsc::error::SendError<SvmRequest>),
#[error(transparent)]
WatchSendError(#[from] tokio::sync::watch::error::SendError<ServiceState>),
#[error(transparent)]
MpscSendEventError(#[from] tokio::sync::mpsc::error::SendError<ServiceEvent>),
#[error(transparent)]
WatchRecvError(#[from] tokio::sync::watch::error::RecvError),
/* errors from run service */
#[error(transparent)]
ServiceError(Box<dyn Error + Send>),
}
pub type SvcResult<T> = Result<T, SvcError>;

14
crates/svc/src/lib.rs Normal file
View File

@@ -0,0 +1,14 @@
pub mod policy;
pub mod serviceid;
pub mod traits;
#[cfg(feature = "manager")]
pub mod error;
#[cfg(feature = "manager")]
pub mod manager;
#[cfg(feature = "manager")]
pub mod rpc;
#[cfg(feature = "manager")]
pub mod runservice;
#[cfg(feature = "manager")]
pub mod template;

574
crates/svc/src/manager.rs Normal file
View File

@@ -0,0 +1,574 @@
#![allow(clippy::future_not_send)]
//! A [`ServiceManager`] manages a collection of [`Service`] instances.
use std::collections::{BTreeMap, BTreeSet};
use std::error::Error;
use std::fmt::Debug;
use std::future::Future;
use std::time::Duration;
use futures::future::BoxFuture;
use tokio::select;
use tokio::sync::{mpsc, watch};
use tokio::task::{AbortHandle, JoinHandle, JoinSet};
use uuid::Uuid;
use crate::error::{RunSvcError, SvcError, SvcResult};
use crate::rpc::RpcRequest;
use crate::runservice::StandardService;
use crate::serviceid::{IntoServiceId, ServiceId, ServiceName};
use crate::template::ServiceTemplate;
use crate::traits::{Service, ServiceRunner, ServiceState};
#[derive(Debug)]
pub struct ServiceInstance {
tx: watch::Sender<ServiceState>,
name: ServiceName,
state: ServiceState,
abort_handle: AbortHandle,
}
pub type ServiceFunc = Box<
dyn FnOnce(
Uuid,
watch::Receiver<ServiceState>,
mpsc::UnboundedSender<ServiceEvent>,
) -> BoxFuture<'static, Result<(), RunSvcError>>
+ Send,
>;
#[derive(Debug, Clone, Copy)]
pub struct ServiceEvent {
id: Uuid,
state: ServiceState,
}
impl ServiceEvent {
#[must_use]
pub const fn new(id: Uuid, state: ServiceState) -> Self {
Self { id, state }
}
#[must_use]
pub const fn id(&self) -> Uuid {
self.id
}
#[must_use]
pub const fn state(&self) -> ServiceState {
self.state
}
}
/// A request to a [`ServiceManager`]
pub enum SvmRequest {
Stop(RpcRequest<ServiceId, SvcResult<Uuid>>),
Start(RpcRequest<ServiceId, SvcResult<Uuid>>),
Status(RpcRequest<ServiceId, SvcResult<ServiceState>>),
List(RpcRequest<(), Vec<(Uuid, ServiceName)>>),
Resolve(RpcRequest<ServiceId, SvcResult<Uuid>>),
LookupName(RpcRequest<ServiceId, SvcResult<ServiceName>>),
Register(RpcRequest<(String, ServiceFunc), SvcResult<Uuid>>),
RegisterTemplate(RpcRequest<(String, Box<dyn ServiceTemplate>), SvcResult<()>>),
Subscribe(RpcRequest<mpsc::UnboundedSender<ServiceEvent>, SvcResult<Uuid>>),
Shutdown(RpcRequest<(), ()>),
}
#[derive(Clone)]
pub struct SvmClient {
tx: mpsc::UnboundedSender<SvmRequest>,
}
impl SvmClient {
#[must_use]
pub const fn new(tx: mpsc::UnboundedSender<SvmRequest>) -> Self {
Self { tx }
}
pub async fn rpc<Q, A>(
&mut self,
func: impl FnOnce(RpcRequest<Q, A>) -> SvmRequest,
args: Q,
) -> SvcResult<A> {
let (rpc, rx) = RpcRequest::new(args);
self.send(func(rpc))?;
Ok(rx.await?)
}
fn send(&self, value: SvmRequest) -> SvcResult<()> {
Ok(self.tx.send(value)?)
}
pub async fn register_service<S>(&mut self, name: impl AsRef<str>, svc: S) -> SvcResult<Uuid>
where
S: Service + 'static,
{
self.register(&name, StandardService::new(&name, svc)).await
}
pub async fn register_function<F, E>(
&mut self,
name: impl AsRef<str>,
func: F,
) -> SvcResult<Uuid>
where
F: Future<Output = Result<(), E>> + Send + 'static,
E: Error + Send + 'static,
{
self.register(&name, StandardService::new(&name, Box::pin(func)))
.await
}
pub async fn register<S>(&mut self, name: impl AsRef<str>, svc: S) -> SvcResult<Uuid>
where
S: ServiceRunner + Send + 'static,
{
let name = name.as_ref().to_string();
self.rpc(
SvmRequest::Register,
(name, Box::new(|a, b, c| svc.run(a, b, c))),
)
.await?
}
pub async fn register_template(
&mut self,
name: impl AsRef<str>,
generator: impl ServiceTemplate + 'static,
) -> SvcResult<()> {
let name = name.as_ref().to_string();
self.rpc(SvmRequest::RegisterTemplate, (name, Box::new(generator)))
.await?
}
pub async fn start(&mut self, id: impl IntoServiceId) -> SvcResult<Uuid> {
self.rpc(SvmRequest::Start, id.service_id()).await?
}
pub async fn stop(&mut self, id: impl IntoServiceId) -> SvcResult<Uuid> {
self.rpc(SvmRequest::Stop, id.service_id()).await?
}
pub async fn resolve(&mut self, id: impl IntoServiceId) -> SvcResult<Uuid> {
self.rpc(SvmRequest::Resolve, id.service_id()).await?
}
pub async fn lookup_name(&mut self, id: impl IntoServiceId) -> SvcResult<ServiceName> {
self.rpc(SvmRequest::LookupName, id.service_id()).await?
}
pub async fn subscribe(&mut self) -> SvcResult<(Uuid, mpsc::UnboundedReceiver<ServiceEvent>)> {
let (tx, rx) = mpsc::unbounded_channel();
let uuid = self.rpc(SvmRequest::Subscribe, tx).await??;
Ok((uuid, rx))
}
pub async fn wait_for_state(
&mut self,
handle: impl IntoServiceId,
expected: ServiceState,
) -> SvcResult<()> {
let svc_id = self.resolve(&handle).await?;
let (_cid, mut channel) = self.subscribe().await?;
while let Some(msg) = channel.recv().await {
if msg.id == svc_id {
if msg.state == expected {
return Ok(());
}
if msg.state == ServiceState::Failed {
return Err(SvcError::ServiceFailed);
}
}
}
Err(SvcError::Shutdown)
}
pub async fn wait_for_start(
&mut self,
handle: impl IntoServiceId + Send + 'static,
) -> SvcResult<()> {
self.wait_for_state(handle, ServiceState::Running).await
}
pub async fn wait_for_stop(
&mut self,
handle: impl IntoServiceId + Send + 'static,
) -> SvcResult<()> {
self.wait_for_state(handle, ServiceState::Stopped).await
}
pub async fn status(
&mut self,
id: impl IntoServiceId + Send + 'static,
) -> SvcResult<ServiceState> {
self.rpc(SvmRequest::Status, id.service_id()).await?
}
pub async fn list(&mut self) -> SvcResult<Vec<(Uuid, ServiceName)>> {
self.rpc(SvmRequest::List, ()).await
}
pub async fn shutdown(&mut self) -> SvcResult<()> {
self.rpc(SvmRequest::Shutdown, ()).await
}
}
impl Debug for SvmRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Stop(arg0) => f.debug_tuple("Stop").field(arg0).finish(),
Self::Start(arg0) => f.debug_tuple("Start").field(arg0).finish(),
Self::Status(arg0) => f.debug_tuple("Status").field(arg0).finish(),
Self::List(arg0) => f.debug_tuple("List").field(arg0).finish(),
Self::Register(_arg0) => f.debug_tuple("Register").field(&"<service>").finish(),
Self::RegisterTemplate(_arg0) => f
.debug_tuple("RegisterTemplate")
.field(&"<service>")
.finish(),
Self::Resolve(arg0) => f.debug_tuple("Resolve").field(arg0).finish(),
Self::LookupName(arg0) => f.debug_tuple("ResolveName").field(arg0).finish(),
Self::Subscribe(_arg0) => f.debug_tuple("Subscribe").finish(),
Self::Shutdown(_arg0) => f.debug_tuple("Shutdown").finish(),
}
}
}
pub struct ServiceManager {
control_rx: mpsc::UnboundedReceiver<SvmRequest>,
control_tx: mpsc::UnboundedSender<SvmRequest>,
service_rx: mpsc::UnboundedReceiver<ServiceEvent>,
service_tx: mpsc::UnboundedSender<ServiceEvent>,
subscribers: BTreeMap<Uuid, mpsc::UnboundedSender<ServiceEvent>>,
svcs: BTreeMap<Uuid, ServiceInstance>,
names: BTreeMap<ServiceName, Uuid>,
tasks: JoinSet<Result<(), RunSvcError>>,
templates: BTreeMap<String, Box<dyn ServiceTemplate>>,
shutdown: bool,
}
impl Default for ServiceManager {
fn default() -> Self {
Self::new()
}
}
impl ServiceManager {
#[must_use]
pub fn new() -> Self {
let (control_tx, control_rx) = mpsc::unbounded_channel();
let (service_tx, service_rx) = mpsc::unbounded_channel();
Self {
control_tx,
control_rx,
service_tx,
service_rx,
subscribers: BTreeMap::new(),
svcs: BTreeMap::new(),
names: BTreeMap::new(),
tasks: JoinSet::new(),
templates: BTreeMap::new(),
shutdown: false,
}
}
/// Daemonize the [`ServiceManager`], returning a (clonable) [`SvmClient`] as
/// well as a [`JoinHandle`] used to control the service manager task
/// itself.
#[must_use]
pub fn daemonize(self) -> (SvmClient, JoinHandle<SvcResult<()>>) {
let client = self.client();
let fut = tokio::task::spawn(self.run());
(client, fut)
}
/// Convenience function to create and daemonize a [`ServiceManager`].
#[must_use]
pub fn spawn() -> (SvmClient, JoinHandle<SvcResult<()>>) {
Self::new().daemonize()
}
/// Create a new [`SvmClient`] connected to this service manager.
#[must_use]
pub fn client(&self) -> SvmClient {
SvmClient::new(self.handle())
}
fn handle(&self) -> mpsc::UnboundedSender<SvmRequest> {
self.control_tx.clone()
}
fn register(&mut self, name: ServiceName, svc: ServiceFunc) -> SvcResult<Uuid> {
if self.names.contains_key(&name) {
return Err(SvcError::ServiceAlreadyExists(name));
}
let (tx, rx) = watch::channel(ServiceState::Registered);
let id = Uuid::new_v4();
let abort_handle = self.tasks.spawn((svc)(id, rx, self.service_tx.clone()));
let rec = ServiceInstance {
tx,
name: name.clone(),
state: ServiceState::Registered,
abort_handle,
};
self.svcs.insert(id, rec);
self.names.insert(name, id);
Ok(id)
}
fn list(&self) -> impl Iterator<Item = &Uuid> {
self.svcs.keys()
}
fn resolve(&self, handle: impl IntoServiceId) -> SvcResult<Uuid> {
let id = handle.service_id();
match &id {
ServiceId::Name(name) => self
.names
.get(name)
.ok_or_else(|| SvcError::ServiceNotFound(id))
.copied(),
ServiceId::Id(uuid) => {
if self.svcs.contains_key(uuid) {
Ok(*uuid)
} else {
Err(SvcError::ServiceNotFound(id))
}
}
}
}
fn remove(&mut self, handle: &ServiceId) -> SvcResult<()> {
let id = self.resolve(handle)?;
self.svcs.remove(&id);
self.names.retain(|_, v| *v != id);
Ok(())
}
fn abort(&mut self, id: &ServiceId) -> SvcResult<()> {
let svc = self.get(id)?;
svc.abort_handle.abort();
self.remove(id)
}
fn get(&self, svc: impl IntoServiceId) -> SvcResult<&ServiceInstance> {
let id = self.resolve(svc)?;
Ok(&self.svcs[&id])
}
fn start(&mut self, id: impl IntoServiceId) -> SvcResult<Uuid> {
let id = id.service_id();
// if the service is known, attempt to start it
if let Ok(svc) = self.get(&id) {
log::debug!("Starting service: {id} {}", &svc.name);
svc.tx.send(ServiceState::Running)?;
return self.resolve(&id);
}
// ..else, check if it's a named instance
let ServiceId::Name(svc_name) = &id else {
return Err(SvcError::ServiceNotFound(id));
};
let Some(inst) = svc_name.instance() else {
return Err(SvcError::ServiceNotFound(id));
};
let Some(tmpl) = &self.templates.get(svc_name.name()) else {
return Err(SvcError::ServiceNotFound(id));
};
let inner = tmpl.generate(inst.to_string())?;
let svc = StandardService::new(svc_name.name(), inner);
let uuid = self.register(svc_name.clone(), svc.boxed())?;
Ok(uuid)
}
fn stop(&self, id: impl IntoServiceId) -> SvcResult<Uuid> {
let id = self.resolve(id)?;
if self.svcs[&id].state == ServiceState::Stopped {
return Ok(id);
}
log::debug!("Stopping service: {id} {}", self.svcs[&id].name);
self.get(id)
.and_then(|svc| Ok(svc.tx.send(ServiceState::Stopped)?))?;
Ok(id)
}
fn notify_subscribers(&mut self, event: ServiceEvent) {
let mut failed = vec![];
for (key, sub) in &self.subscribers {
log::trace!("UPDATE: [sub-{key}] {} -> {:?}", &event.id, &event.state);
if sub.send(event).is_err() {
failed.push(*key);
}
}
if !failed.is_empty() {
self.subscribers.retain(|k, _| !failed.contains(k));
}
}
async fn next_event(&mut self) -> SvcResult<()> {
tokio::select! {
event = self.control_rx.recv() => self.handle_svm_request(event.ok_or(SvcError::Shutdown)?).await,
event = self.service_rx.recv() => {
self.handle_service_event(event.ok_or(SvcError::Shutdown)?);
Ok(())
},
}
}
fn handle_service_event(&mut self, event: ServiceEvent) {
self.notify_subscribers(event);
let name = &self.svcs[&event.id].name;
log::trace!("[{name}] [{}] Service is now {:?}", event.id, event.state);
self.svcs.get_mut(&event.id).unwrap().state = event.state;
}
async fn handle_svm_request(&mut self, upd: SvmRequest) -> SvcResult<()> {
match upd {
SvmRequest::Start(rpc) => rpc.respond(|id| self.start(&id)),
SvmRequest::Stop(rpc) => rpc.respond(|id| self.stop(&id)),
SvmRequest::Status(rpc) => rpc.respond(|id| Ok(self.get(&id)?.state)),
SvmRequest::List(rpc) => rpc.respond(|()| {
let mut res = vec![];
for (name, id) in &self.names {
res.push((*id, name.clone()));
}
res
}),
SvmRequest::Register(rpc) => {
rpc.respond(|(name, svc)| self.register(ServiceName::from(name), svc));
}
SvmRequest::RegisterTemplate(rpc) => rpc.respond(|(name, tmpl)| {
self.templates.insert(name, tmpl);
Ok(())
}),
SvmRequest::Resolve(rpc) => rpc.respond(|id| self.resolve(&id)),
SvmRequest::LookupName(rpc) => rpc.respond(|id| Ok(self.get(&id)?.name.clone())),
SvmRequest::Subscribe(rpc) => {
for (id, svc) in &self.svcs {
rpc.data().send(ServiceEvent::new(*id, svc.state))?;
}
rpc.respond(|tx| {
let uuid = Uuid::new_v4();
self.subscribers.insert(uuid, tx);
Ok(uuid)
});
}
SvmRequest::Shutdown(rpc) => {
log::info!("Service manager shutting down..");
let ids: Vec<Uuid> = self.list().copied().collect();
self.stop_multiple(&ids)?;
select! {
Ok(()) = Box::pin(self.wait_for_multiple(&ids, ServiceState::Stopped)) => {}
() = tokio::time::sleep(Duration::from_secs(3)) => {
log::error!("Service shutdown timed out, aborting tasks..");
for id in &ids {
let si = self.get(id)?;
log::error!(" ..aborting {id}: {si:?}");
self.abort(&ServiceId::from(*id))?;
}
}
}
log::debug!("All services stopped.");
self.shutdown = true;
rpc.respond(|()| ());
}
}
Ok(())
}
fn stop_multiple(&self, handles: &[impl IntoServiceId]) -> SvcResult<()> {
let ids = self.resolve_multiple(handles)?;
for id in ids {
self.stop(id)?;
}
Ok(())
}
fn resolve_multiple(&self, handles: &[impl IntoServiceId]) -> SvcResult<BTreeSet<Uuid>> {
let res = BTreeSet::from_iter(
handles
.iter()
.map(|id| self.resolve(id))
.collect::<Result<Vec<Uuid>, SvcError>>()?,
);
Ok(res)
}
async fn wait_for_multiple(
&mut self,
handles: &[impl IntoServiceId],
target: ServiceState,
) -> SvcResult<()> {
let mut missing = self.resolve_multiple(handles)?;
let mut done = BTreeSet::new();
loop {
for m in &missing {
let state = self.get(*m)?.state;
if state == ServiceState::Failed && target != ServiceState::Stopped {
return Err(SvcError::ServiceFailed);
}
if state == target {
done.insert(*m);
}
}
missing.retain(|f| !done.contains(f));
if missing.is_empty() {
return Ok(());
}
self.next_event().await?;
}
}
pub async fn run(mut self) -> SvcResult<()> {
while !self.shutdown {
self.next_event().await?;
}
Ok(())
}
}

71
crates/svc/src/policy.rs Normal file
View File

@@ -0,0 +1,71 @@
//! Implements policies for service behavior (retry count, delay, etc).
use std::time::Duration;
#[cfg(feature = "manager")]
use tokio::time::sleep;
#[derive(Debug, Clone, Copy)]
pub enum Retry {
No,
Limit(u32),
Forever,
}
#[derive(Debug, Clone, Copy)]
pub struct Policy {
pub retry: Retry,
pub delay: Option<Duration>,
}
impl Default for Policy {
fn default() -> Self {
Self::new()
}
}
impl Policy {
#[must_use]
pub const fn new() -> Self {
Self {
retry: Retry::No,
delay: None,
}
}
#[must_use]
pub const fn with_retry(self, retry: Retry) -> Self {
Self { retry, ..self }
}
#[must_use]
pub const fn with_delay(self, delay: Duration) -> Self {
Self {
delay: Some(delay),
..self
}
}
#[must_use]
pub const fn without_delay(self) -> Self {
Self {
delay: None,
..self
}
}
#[cfg(feature = "manager")]
pub async fn sleep(&self) {
if let Some(dur) = self.delay {
sleep(dur).await;
}
}
#[must_use]
pub const fn should_retry(&self, retry: u32) -> bool {
match self.retry {
Retry::No => false,
Retry::Limit(limit) => retry < limit,
Retry::Forever => true,
}
}
}

34
crates/svc/src/rpc.rs Normal file
View File

@@ -0,0 +1,34 @@
//! Data types for request/response-style communication.
use tokio::sync::oneshot;
use tokio::sync::oneshot::{Receiver, Sender};
#[derive(Debug)]
pub struct RpcRequest<Q, A> {
data: Q,
rsp: Sender<A>,
}
impl<Q, A> RpcRequest<Q, A> {
pub fn new(data: Q) -> (Self, Receiver<A>) {
let (tx, rx) = oneshot::channel();
let req = Self { data, rsp: tx };
(req, rx)
}
pub const fn data(&self) -> &Q {
&self.data
}
pub fn into_inner(self) -> (Q, Sender<A>) {
(self.data, self.rsp)
}
pub fn inspect(&mut self, func: impl Fn(&mut Q)) {
func(&mut self.data);
}
pub fn respond(self, func: impl FnOnce(Q) -> A) {
let res = func(self.data);
let _ = self.rsp.send(res);
}
}

View File

@@ -0,0 +1,260 @@
use async_trait::async_trait;
use std::time::Duration;
use tokio::sync::{mpsc, watch};
use tokio::time::sleep;
use uuid::Uuid;
use crate::error::RunSvcError;
use crate::manager::{ServiceEvent, ServiceFunc};
use crate::policy::{Policy, Retry};
use crate::traits::{Service, ServiceRunner, ServiceState, StopResult};
#[allow(clippy::struct_field_names)]
struct State {
id: Uuid,
retry: u32,
state: ServiceState,
tx: mpsc::UnboundedSender<ServiceEvent>,
}
impl State {
pub const fn new(
id: Uuid,
state: ServiceState,
tx: mpsc::UnboundedSender<ServiceEvent>,
) -> Self {
Self {
id,
retry: 0,
state,
tx,
}
}
pub fn set(&mut self, next: ServiceState) -> Result<(), RunSvcError> {
self.state = next;
self.retry = 0;
Ok(self.tx.send(ServiceEvent::new(self.id, self.state))?)
}
pub const fn get(&self) -> ServiceState {
self.state
}
pub const fn retry(&mut self) -> u32 {
let res = self.retry;
self.retry += 1;
res
}
}
pub struct StandardService<S: Service> {
name: String,
svc: S,
configure_policy: Policy,
start_policy: Policy,
run_policy: Policy,
stop_policy: Policy,
}
impl<S: Service> StandardService<S> {
pub fn new(name: impl AsRef<str>, svc: S) -> Self {
Self {
name: name.as_ref().to_string(),
svc,
configure_policy: Policy::new(),
start_policy: Policy::new()
.with_delay(Duration::from_secs(1))
.with_retry(Retry::Limit(3)),
run_policy: Policy::new().with_delay(Duration::from_secs(1)),
stop_policy: Policy::new(),
}
}
#[allow(clippy::missing_const_for_fn)]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub const fn with_configure_policy(mut self, policy: Policy) -> Self {
self.configure_policy = policy;
self
}
#[must_use]
pub const fn with_start_policy(mut self, policy: Policy) -> Self {
self.start_policy = policy;
self
}
#[must_use]
pub const fn with_run_policy(mut self, policy: Policy) -> Self {
self.run_policy = policy;
self
}
#[must_use]
pub const fn with_stop_policy(mut self, policy: Policy) -> Self {
self.stop_policy = policy;
self
}
}
impl<S: Service + 'static> StandardService<S> {
pub fn boxed(self) -> ServiceFunc {
Box::new(|a, b, c| self.run(a, b, c))
}
}
#[allow(clippy::too_many_lines)]
#[async_trait]
impl<S: Service> ServiceRunner for StandardService<S> {
async fn run(
mut self,
id: Uuid,
mut rx: watch::Receiver<ServiceState>,
tx: mpsc::UnboundedSender<ServiceEvent>,
) -> Result<(), RunSvcError> {
let name = self.name;
let target = &format!("[{name}]");
let mut svc = self.svc;
log::trace!(target:target, "Registered");
svc.configure()
.await
.map_err(|e| RunSvcError::ServiceError(Box::new(e)))?;
let mut state = State::new(id, ServiceState::Registered, tx);
loop {
match state.get() {
ServiceState::Registered => {
if *rx.borrow() == ServiceState::Running {
match svc.configure().await {
Ok(()) => {
log::trace!(target:target, "Configured");
state.set(ServiceState::Configured)?;
}
Err(err) => {
log::error!(target:target, "Failed to configure service: {err}");
sleep(Duration::from_secs(3)).await;
}
}
} else {
rx.changed().await?;
}
}
ServiceState::Configured => {
log::trace!(target:target, "Service configured, and is ready start.");
if *rx.borrow_and_update() == ServiceState::Running {
state.set(ServiceState::Starting)?;
} else {
rx.changed().await?;
}
}
ServiceState::Starting => match svc.start().await {
Ok(()) => {
log::debug!(target:target, "Started");
state.set(ServiceState::Running)?;
}
Err(err) => {
log::error!(target:target, "Failed to start service: {err}");
if *rx.borrow() == ServiceState::Stopped {
state.set(ServiceState::Stopped)?;
} else {
sleep(Duration::from_secs(3)).await;
}
}
},
ServiceState::Running => {
tokio::select! {
res = svc.run() => match res {
Ok(()) => {
log::debug!(target:target, "Service completed successfully");
state.set(ServiceState::Stopping)?;
}
Err(err) => {
self.run_policy.sleep().await;
if self.run_policy.should_retry(state.retry()) {
log::warn!(target:target, "Service failed to start, retrying..");
} else {
log::error!(target:target, "Failed to run service: {err}");
match svc.stop().await {
Ok(()) => {
log::debug!(target:target, "Stopped failing service");
}
Err(err) => {
log::error!(
"Failed to stop failing service: {err}"
);
}
}
state.set(ServiceState::Failed)?;
}
}
},
_ = rx.changed() => if *rx.borrow() == ServiceState::Stopped {
log::trace!(target:target, "Stopping service");
let stop = svc.signal_stop().await.map_err(|e| RunSvcError::ServiceError(Box::new(e)))?;
match stop {
StopResult::Delivered => {
log::trace!(target:target, "Service state change requested (graceful)");
tokio::select! {
res = svc.run() => {
log::trace!(target:target, "Service finished running within timeout: {res:?}");
},
() = sleep(Duration::from_secs(1)) => {
log::warn!("timeout");
}
}
state.set(ServiceState::Stopping)?;
}
StopResult::NotSupported => {
log::trace!(target:target, "Service state change requested: {:?} -> {:?}", state.get(), *rx.borrow());
if *rx.borrow_and_update() == ServiceState::Stopped {
state.set(ServiceState::Stopping)?;
}
}
}
}
}
}
ServiceState::Stopping => match svc.stop().await {
Ok(()) => {
log::trace!(target:target, "Stopping");
state.set(ServiceState::Stopped)?;
}
Err(err) => {
log::error!(target:target, "Failed to stop service: {err}");
sleep(Duration::from_secs(3)).await;
}
},
ServiceState::Stopped => {
rx.changed().await?;
if rx.has_changed()? {
log::debug!(target:target, "Service stopped.");
}
if *rx.borrow_and_update() == ServiceState::Running {
state.set(ServiceState::Starting)?;
}
}
ServiceState::Failed => {
rx.changed().await?;
if rx.has_changed()? {
log::debug!(target:target, "Service failed.");
}
if *rx.borrow() == ServiceState::Stopped {
state.set(ServiceState::Stopped)?;
}
}
}
}
}
}

153
crates/svc/src/serviceid.rs Normal file
View File

@@ -0,0 +1,153 @@
use std::fmt::{Debug, Display};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(from = "String", into = "String")]
pub struct ServiceName {
name: String,
instance: Option<String>,
}
impl From<ServiceName> for String {
fn from(value: ServiceName) -> Self {
value.to_string()
}
}
impl ServiceName {
#[must_use]
pub const fn new(name: String, instance: Option<String>) -> Self {
Self { name, instance }
}
// suppress clippy false-positive
#[allow(clippy::missing_const_for_fn)]
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn instance(&self) -> Option<&str> {
self.instance.as_deref()
}
}
impl Display for ServiceName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self {
name,
instance: None,
} => write!(f, "{name}"),
Self {
name,
instance: Some(instance),
} => write!(f, "{name}@{instance}"),
}
}
}
#[derive(Debug, Clone)]
pub enum ServiceId {
Name(ServiceName),
Id(Uuid),
}
impl ServiceId {
pub fn instance(name: impl Into<String>, instance: impl Into<String>) -> Self {
Self::Name(ServiceName {
name: name.into(),
instance: Some(instance.into()),
})
}
}
impl Display for ServiceId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Name(name) => write!(f, "{name}"),
Self::Id(uuid) => write!(f, "{uuid}"),
}
}
}
pub trait IntoServiceId: Display + Debug + Clone {
fn service_id(self) -> ServiceId;
}
impl IntoServiceId for ServiceId {
fn service_id(self) -> ServiceId {
self
}
}
impl<I: IntoServiceId> IntoServiceId for &I {
fn service_id(self) -> ServiceId {
self.clone().service_id()
}
}
impl IntoServiceId for Uuid {
fn service_id(self) -> ServiceId {
ServiceId::Id(self)
}
}
impl IntoServiceId for String {
fn service_id(self) -> ServiceId {
ServiceId::Name(ServiceName::from(self))
}
}
impl IntoServiceId for &str {
fn service_id(self) -> ServiceId {
ServiceId::Name(ServiceName::from(self))
}
}
impl From<Uuid> for ServiceId {
fn from(value: Uuid) -> Self {
Self::Id(value)
}
}
impl From<String> for ServiceId {
fn from(value: String) -> Self {
value.service_id()
}
}
impl From<String> for ServiceName {
fn from(value: String) -> Self {
if let Some((name, instance)) = value.split_once('@') {
Self {
name: name.to_string(),
instance: Some(instance.to_string()),
}
} else {
Self {
name: value,
instance: None,
}
}
}
}
impl From<&str> for ServiceName {
fn from(value: &str) -> Self {
if let Some((name, instance)) = value.split_once('@') {
Self {
name: name.to_string(),
instance: Some(instance.to_string()),
}
} else {
Self {
name: value.to_string(),
instance: None,
}
}
}
}

View File

@@ -0,0 +1,70 @@
use async_trait::async_trait;
#[cfg(feature = "manager")]
use crate::error::RunSvcError;
use crate::error::SvcError;
use crate::traits::{BoxDynService, Service, StopResult};
#[cfg(feature = "manager")]
pub trait ServiceTemplate: Send {
fn generate(&self, instance: String) -> Result<BoxDynService, SvcError>;
}
pub struct ErrorAdapter<S: Service> {
svc: S,
}
impl<S: Service> ErrorAdapter<S> {
pub const fn new(svc: S) -> Self {
Self { svc }
}
}
#[async_trait]
impl<S: Service> Service for ErrorAdapter<S> {
type Error = RunSvcError;
async fn configure(&mut self) -> Result<(), Self::Error> {
self.svc
.configure()
.await
.map_err(|err| RunSvcError::ServiceError(Box::new(err)))
}
async fn start(&mut self) -> Result<(), Self::Error> {
self.svc
.start()
.await
.map_err(|err| RunSvcError::ServiceError(Box::new(err)))
}
async fn run(&mut self) -> Result<(), Self::Error> {
self.svc
.run()
.await
.map_err(|err| RunSvcError::ServiceError(Box::new(err)))
}
async fn stop(&mut self) -> Result<(), Self::Error> {
self.svc
.stop()
.await
.map_err(|err| RunSvcError::ServiceError(Box::new(err)))
}
async fn signal_stop(&mut self) -> Result<StopResult, Self::Error> {
self.svc
.signal_stop()
.await
.map_err(|err| RunSvcError::ServiceError(Box::new(err)))
}
}
impl<F> ServiceTemplate for F
where
F: Fn(String) -> Result<BoxDynService, SvcError> + Send,
{
fn generate(&self, instance: String) -> Result<BoxDynService, SvcError> {
self(instance)
}
}

162
crates/svc/src/traits.rs Normal file
View File

@@ -0,0 +1,162 @@
use std::error::Error;
use async_trait::async_trait;
use futures::future::BoxFuture;
use serde::{Deserialize, Serialize};
#[cfg(feature = "manager")]
use crate::error::RunSvcError;
#[cfg(feature = "manager")]
use crate::manager::ServiceEvent;
#[cfg(feature = "manager")]
use crate::template::ErrorAdapter;
#[cfg(feature = "manager")]
use std::future::Future;
#[cfg(feature = "manager")]
use tokio::sync::{mpsc, watch};
#[cfg(feature = "manager")]
use uuid::Uuid;
/**
State of a [`Service`] running on a [`crate::manager::ServiceManager`].
Transition diagram for [`ServiceState`]:
```text
┌────────────────┐
│ Registered ├──┐
│ │ │
└───────────┬────┘ │
┌───────────▼────┐ │
│ Configured │ │
│ │ │
└───────────┬────┘ │
┌───────────▼────┐ │
┌─►│ Starting ├──┤
│ │ │ │
│ └───────────┬────┘ │
│ ┌───────────▼────┐ │
│ │ Running ├──┤
│ │ │ │
│ └───────────┬────┘ │
│ ┌───────────▼────┐ │
│ │ Stopping ├──┤
│ │ │ │
│ └───────────┬────┘ │
│ ┌───────────▼────┐ │
└──┤ Stopped │ │
┌─►│ │ │
│ └────────────────┘ │
│ ┌────────────────┐ │
│ │ Failed │ │
└──┤ │◄─┘
└────────────────┘
```
*/
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ServiceState {
/// Service is registered with the service manager, but not configured yet
Registered,
/// Service is registered, and has finished one-time setup in preparation for running
Configured,
/// Service is in the starting phase. If successfull, it will then be in [`ServiceState::Running`].
Starting,
/// Service is running normally
Running,
/// Servic is in the shutdown phase. If successfull, it will then be in [`ServiceState::Stopped`].
Stopping,
/// Service is not running, but is ready to start up again
Stopped,
/// Service has failed
Failed,
}
pub enum StopResult {
Delivered,
NotSupported,
}
#[async_trait]
pub trait Service: Send {
type Error: Error + Send + 'static;
async fn configure(&mut self) -> Result<(), Self::Error> {
Ok(())
}
async fn start(&mut self) -> Result<(), Self::Error> {
Ok(())
}
async fn run(&mut self) -> Result<(), Self::Error>;
async fn stop(&mut self) -> Result<(), Self::Error> {
Ok(())
}
async fn signal_stop(&mut self) -> Result<StopResult, Self::Error> {
Ok(StopResult::NotSupported)
}
#[cfg(feature = "manager")]
fn boxed(self) -> BoxDynService
where
Self: Sized + Unpin + 'static,
{
Box::new(ErrorAdapter::new(self)) as BoxDynService
}
}
#[cfg(feature = "manager")]
pub type BoxDynService = Box<dyn Service<Error = RunSvcError> + Unpin + 'static>;
#[cfg(feature = "manager")]
impl Service for BoxDynService {
type Error = RunSvcError;
fn run<'a: 'b, 'b>(&'a mut self) -> BoxFuture<'b, Result<(), Self::Error>> {
(**self).run()
}
fn configure<'a: 'b, 'b>(&'a mut self) -> BoxFuture<'b, Result<(), Self::Error>> {
(**self).configure()
}
fn start<'a: 'b, 'b>(&'a mut self) -> BoxFuture<'b, Result<(), Self::Error>> {
(**self).start()
}
fn stop<'a: 'b, 'b>(&'a mut self) -> BoxFuture<'b, Result<(), Self::Error>> {
(**self).stop()
}
fn signal_stop<'a: 'b, 'b>(&'a mut self) -> BoxFuture<'b, Result<StopResult, Self::Error>> {
(**self).signal_stop()
}
}
#[cfg(feature = "manager")]
#[async_trait]
pub trait ServiceRunner {
async fn run(
mut self,
id: Uuid,
rx: watch::Receiver<ServiceState>,
tx: mpsc::UnboundedSender<ServiceEvent>,
) -> Result<(), RunSvcError>;
}
#[cfg(feature = "manager")]
#[async_trait]
impl<E, F> Service for F
where
E: Error + Send + 'static,
F: Future<Output = Result<(), E>> + Send + Unpin,
{
type Error = E;
async fn run(&mut self) -> Result<(), E> {
self.await
}
}