A library for writing host-specific, single-binary configuration management and deployment tools
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

389 lines
9.2 KiB

use crate::async_utils::sleep;
use crate::symbols::Symbol;
use async_trait::async_trait;
use slog::{debug, info, o, trace, Logger};
#[cfg(test)]
use std::cell::RefCell;
use std::error::Error;
use std::fmt;
use std::fmt::Debug;
#[cfg(test)]
use std::rc::Rc;
use std::time::Duration;
#[async_trait(?Send)]
pub trait SymbolRunner {
async fn run_symbol<S: Symbol + Debug>(
&self,
symbol: &S,
logger: &Logger,
force: bool,
) -> Result<bool, Box<dyn Error>>;
}
#[derive(Debug, Default)]
pub struct ExecuteDidNotReachError;
impl Error for ExecuteDidNotReachError {}
impl fmt::Display for ExecuteDidNotReachError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Target not reached after executing symbol")
}
}
#[derive(Clone, Debug, Default)]
pub struct InitializingSymbolRunner;
impl InitializingSymbolRunner {
#[must_use]
pub const fn new() -> Self {
Self
}
async fn exec_symbol<S: Symbol + Debug>(
&self,
symbol: &S,
logger: &Logger,
) -> Result<(), Box<dyn Error>> {
info!(logger, "Executing {:?}", symbol);
symbol.execute().await?;
let target_reached = symbol.target_reached().await?;
trace!(
logger,
"Symbol reports target_reached: {:?} (should be true)",
target_reached
);
if target_reached {
Ok(())
} else {
Err(Box::new(ExecuteDidNotReachError))
}
}
}
#[async_trait(?Send)]
impl SymbolRunner for InitializingSymbolRunner {
async fn run_symbol<S: Symbol + Debug>(
&self,
symbol: &S,
logger: &Logger,
force: bool,
) -> Result<bool, Box<dyn Error>> {
let executed = if force {
debug!(logger, "Forcing symbol execution");
self.exec_symbol(symbol, logger).await?;
true
} else {
let target_reached = symbol.target_reached().await?;
if target_reached {
debug!(logger, "{:?} already reached", symbol);
} else {
trace!(
logger,
"Symbol reports target_reached: {:?}",
target_reached
);
self.exec_symbol(symbol, logger).await?;
}
!target_reached
};
Ok(executed)
}
}
#[derive(Clone, Debug)]
pub struct DelayingSymbolRunner<R>(R);
impl<R> DelayingSymbolRunner<R> {
#[must_use]
pub const fn new(symbol_runner: R) -> Self {
Self(symbol_runner)
}
}
#[async_trait(?Send)]
impl<R> SymbolRunner for DelayingSymbolRunner<R>
where
R: SymbolRunner,
{
#[allow(
clippy::cast_sign_loss,
clippy::cast_possible_truncation,
clippy::cast_precision_loss
)]
async fn run_symbol<S: Symbol + Debug>(
&self,
symbol: &S,
logger: &Logger,
force: bool,
) -> Result<bool, Box<dyn Error>> {
sleep(Duration::from_millis(
(((std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.subsec_micros()
% 20) as f32
/ 2.0)
.exp()
/ 8.0) as u64,
))
.await;
self.0.run_symbol(symbol, logger, force).await
}
}
#[derive(Clone, Debug, Default)]
pub struct DrySymbolRunner;
impl DrySymbolRunner {
#[must_use]
pub const fn new() -> Self {
Self
}
}
#[async_trait(?Send)]
impl SymbolRunner for DrySymbolRunner {
async fn run_symbol<S: Symbol + Debug>(
&self,
symbol: &S,
logger: &Logger,
force: bool,
) -> Result<bool, Box<dyn Error>> {
let would_execute = if force {
info!(logger, "Would force-execute");
true
} else {
let target_reached = symbol.target_reached().await?;
debug!(
logger,
"Symbol reports target_reached: {:?}", target_reached
);
if !target_reached {
info!(logger, "Would execute");
}
!target_reached
};
Ok(would_execute)
}
}
#[derive(Clone, Debug)]
pub struct ReportingSymbolRunner<R>(R);
impl<R> ReportingSymbolRunner<R> {
#[must_use]
pub const fn new(symbol_runner: R) -> Self {
Self(symbol_runner)
}
}
#[async_trait(?Send)]
impl<R> SymbolRunner for ReportingSymbolRunner<R>
where
R: SymbolRunner,
{
async fn run_symbol<S: Symbol + Debug>(
&self,
symbol: &S,
logger: &Logger,
force: bool,
) -> Result<bool, Box<dyn Error>> {
let logger = logger.new(o!("symbol" => format!("{symbol:?}")));
debug!(logger, "Running ...");
let res = self.0.run_symbol(symbol, &logger, force).await;
if let Err(ref e) = res {
info!(logger, "failed with {}", e);
} else {
debug!(logger, "Successfully finished");
}
res
}
}
#[cfg(test)]
pub struct TestSymbolRunner {
count: Rc<RefCell<usize>>,
}
#[cfg(test)]
impl TestSymbolRunner {
pub fn new() -> (Rc<RefCell<usize>>, Self) {
let count = Rc::new(RefCell::new(0));
let runner = TestSymbolRunner {
count: Rc::clone(&count),
};
(count, runner)
}
}
#[cfg(test)]
#[async_trait(?Send)]
impl SymbolRunner for TestSymbolRunner {
async fn run_symbol<S: Symbol + Debug>(
&self,
symbol: &S,
logger: &Logger,
force: bool,
) -> Result<bool, Box<dyn Error>> {
info!(logger, "run");
let run = force || !symbol.target_reached().await?;
if run {
*self.count.borrow_mut() += 1;
}
Ok(run)
}
}
#[cfg(test)]
mod test {
use super::{DrySymbolRunner, InitializingSymbolRunner, ReportingSymbolRunner, SymbolRunner};
use crate::async_utils::sleep;
use crate::async_utils::{run, try_join};
use crate::symbols::Symbol;
use async_trait::async_trait;
use slog::{o, Discard, Logger};
use std::cell::RefCell;
use std::error::Error;
use std::fmt;
use std::fmt::Debug;
use std::time::Duration;
#[derive(Debug, PartialEq, Clone)]
enum DummySymbolError {
Error(()),
}
impl Error for DummySymbolError {}
impl fmt::Display for DummySymbolError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Dummy symbol error")
}
}
#[derive(Debug)]
struct DummySymbol<T, E> {
_target_reached: RefCell<T>,
_execute: RefCell<E>,
}
#[async_trait(?Send)]
impl<
E: Iterator<Item = Result<(), Box<dyn Error>>>,
T: Iterator<Item = Result<bool, Box<dyn Error>>>,
> Symbol for DummySymbol<T, E>
{
async fn target_reached(&self) -> Result<bool, Box<dyn Error>> {
self._target_reached.borrow_mut().next().unwrap()
}
async fn execute(&self) -> Result<(), Box<dyn Error>> {
self._execute.borrow_mut().next().unwrap()
}
}
impl<
E: Iterator<Item = Result<(), Box<dyn Error>>>,
T: Iterator<Item = Result<bool, Box<dyn Error>>>,
> DummySymbol<T, E>
{
#[must_use]
fn new<
IE: IntoIterator<IntoIter = E, Item = Result<(), Box<dyn Error>>>,
IT: IntoIterator<IntoIter = T, Item = Result<bool, Box<dyn Error>>>,
>(
target_reached: IT,
execute: IE,
) -> Self {
Self {
_target_reached: RefCell::new(target_reached.into_iter()),
_execute: RefCell::new(execute.into_iter()),
}
}
}
fn run_symbol<S: Symbol + Debug>(s: S) -> Result<bool, Box<dyn Error>> {
run(InitializingSymbolRunner::new().run_symbol(&s, &Logger::root(Discard, o!()), false))
}
#[test]
fn nothing_needed_to_be_done() {
let result = run_symbol(DummySymbol::new(vec![Ok(true)], vec![Ok(())]));
assert!(result.is_ok());
}
#[test]
fn everything_is_ok() {
let result = run_symbol(DummySymbol::new(vec![Ok(false), Ok(true)], vec![Ok(())]));
assert!(result.is_ok());
}
#[test]
fn executing_did_not_change_state() {
let result = run_symbol(DummySymbol::new(vec![Ok(false), Ok(false)], vec![Ok(())]));
assert_eq!(
result.unwrap_err().to_string(),
"Target not reached after executing symbol"
);
}
#[test]
fn executing_did_not_work() {
let result = run_symbol(DummySymbol::new(
vec![Ok(false)],
vec![Err(Box::new(DummySymbolError::Error(())) as Box<dyn Error>)],
));
assert_eq!(result.unwrap_err().to_string(), "Dummy symbol error");
}
#[derive(Debug)]
struct SleeperSymbol;
#[async_trait(?Send)]
impl Symbol for SleeperSymbol {
async fn target_reached(&self) -> Result<bool, Box<dyn Error>> {
sleep(Duration::from_millis(0)).await;
Ok(true)
}
async fn execute(&self) -> Result<(), Box<dyn Error>> {
unimplemented!();
}
}
#[test]
fn actually_support_parallel_execution() {
run(async {
let s1 = SleeperSymbol;
let s2 = DummySymbol::new(vec![Ok(false), Ok(true)], vec![Ok(())]);
let l1 = Logger::root(Discard, o!());
let l2 = Logger::root(Discard, o!());
let runner1 = InitializingSymbolRunner::new();
let result = try_join!(
runner1.run_symbol(&s1, &l1, false),
runner1.run_symbol(&s2, &l2, false),
)
.unwrap();
assert_eq!(result, (false, true));
let s2 = DummySymbol::new(vec![Ok(false), Ok(true)], vec![Ok(())]);
let runner2 = DrySymbolRunner::new();
let result = try_join!(
runner2.run_symbol(&s1, &l1, false),
runner2.run_symbol(&s2, &l2, false),
)
.unwrap();
assert_eq!(result, (false, true));
let s2 = DummySymbol::new(vec![Ok(false), Ok(true)], vec![Ok(())]);
let runner3 = ReportingSymbolRunner::new(runner1);
let result = try_join!(
runner3.run_symbol(&s1, &l1, false),
runner3.run_symbol(&s2, &l2, false),
)
.unwrap();
assert_eq!(result, (false, true));
});
}
}