use crate::async_utils::sleep; use crate::loggers::Logger; use crate::symbols::Symbol; use async_trait::async_trait; use std::error::Error; use std::fmt; use std::fmt::Debug; use std::time::Duration; #[async_trait(?Send)] pub trait SymbolRunner { async fn run_symbol( &self, symbol: &S, logger: &L, force: bool, ) -> Result>; } #[derive(Debug)] pub enum SymbolRunError { Symbol(Box), ExecuteDidNotReach(()), } impl Error for SymbolRunError { fn cause(&self) -> Option<&dyn Error> { match self { Self::Symbol(ref e) => Some(&**e), Self::ExecuteDidNotReach(_) => None, } } } impl fmt::Display for SymbolRunError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Symbol(ref e) => write!(f, "{}", e), Self::ExecuteDidNotReach(_) => write!(f, "Target not reached after executing symbol"), } } } #[derive(Clone, Debug, Default)] pub struct InitializingSymbolRunner; impl InitializingSymbolRunner { pub fn new() -> Self { Self } async fn exec_symbol( &self, symbol: &S, logger: &L, ) -> Result<(), Box> { logger.info(format!("Executing {:?}", symbol)); symbol.execute().await?; let target_reached = symbol.target_reached().await?; logger.trace(format!( "Symbol reports target_reached: {:?} (should be true)", target_reached )); if target_reached { Ok(()) } else { Err(Box::new(SymbolRunError::ExecuteDidNotReach(()))) } } } #[async_trait(?Send)] impl SymbolRunner for InitializingSymbolRunner { async fn run_symbol( &self, symbol: &S, logger: &L, force: bool, ) -> Result> { let executed = if force { logger.debug("Forcing symbol execution"); self.exec_symbol(symbol, logger).await?; true } else { let target_reached = symbol.target_reached().await?; if target_reached { logger.debug(format!("{:?} already reached", symbol)); } else { logger.trace(format!( "Symbol reports target_reached: {:?}", target_reached )); self.exec_symbol(symbol, logger).await?; } !target_reached }; Ok(executed) } } #[derive(Clone, Debug)] pub struct DelayingSymbolRunner(R); impl DelayingSymbolRunner { pub fn new(symbol_runner: R) -> Self { Self(symbol_runner) } } #[async_trait(?Send)] impl SymbolRunner for DelayingSymbolRunner where R: SymbolRunner, { async fn run_symbol( &self, symbol: &S, logger: &L, force: bool, ) -> Result> { sleep(Duration::from_millis( (((std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .subsec_micros() % 20) as f32 / 2.0) .exp() / 8.0) as u64, )) .await; self.0.run_symbol(symbol, logger, force).await } } #[derive(Clone, Debug, Default)] pub struct DrySymbolRunner; impl DrySymbolRunner { pub fn new() -> Self { Self } } #[async_trait(?Send)] impl SymbolRunner for DrySymbolRunner { async fn run_symbol( &self, symbol: &S, logger: &L, force: bool, ) -> Result> { let would_execute = if force { logger.info(format!("Would force-execute {:?}", symbol)); true } else { let target_reached = symbol.target_reached().await?; logger.debug(format!( "Symbol reports target_reached: {:?}", target_reached )); if !target_reached { logger.info(format!("Would execute {:?}", symbol)); } !target_reached }; Ok(would_execute) } } #[derive(Clone, Debug)] pub struct ReportingSymbolRunner(R); impl ReportingSymbolRunner { pub fn new(symbol_runner: R) -> Self { Self(symbol_runner) } } #[async_trait(?Send)] impl SymbolRunner for ReportingSymbolRunner where R: SymbolRunner, { async fn run_symbol( &self, symbol: &S, logger: &L, force: bool, ) -> Result> { logger.debug(format!("Running symbol {:?}", symbol)); let res = self.0.run_symbol(symbol, logger, force).await; if let Err(ref e) = res { logger.info(format!("Failed on {:?} with {}, aborting.", symbol, e)) } else { logger.debug(format!("Successfully finished {:?}", symbol)) } res } } #[cfg(test)] mod test { use super::{DrySymbolRunner, InitializingSymbolRunner, ReportingSymbolRunner, SymbolRunner}; use crate::async_utils::sleep; use crate::async_utils::{run, try_join}; use crate::loggers::StoringLogger; use crate::symbols::Symbol; use async_trait::async_trait; use std::cell::RefCell; use std::error::Error; use std::fmt; use std::fmt::Debug; use std::time::Duration; #[derive(Debug, PartialEq, Clone)] enum DummySymbolError { Error(()), } impl Error for DummySymbolError {} impl fmt::Display for DummySymbolError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Dummy symbol error") } } #[derive(Debug)] struct DummySymbol { _target_reached: RefCell, _execute: RefCell, } #[async_trait(?Send)] impl< E: Iterator>>, T: Iterator>>, > Symbol for DummySymbol { async fn target_reached(&self) -> Result> { self._target_reached.borrow_mut().next().unwrap() } async fn execute(&self) -> Result<(), Box> { self._execute.borrow_mut().next().unwrap() } } impl< E: Iterator>>, T: Iterator>>, > DummySymbol { fn new< IE: IntoIterator>>, IT: IntoIterator>>, >( target_reached: IT, execute: IE, ) -> Self { Self { _target_reached: RefCell::new(target_reached.into_iter()), _execute: RefCell::new(execute.into_iter()), } } } fn run_symbol(s: S) -> Result> { run(InitializingSymbolRunner::new().run_symbol(&s, &StoringLogger::new(), false)) } #[test] fn nothing_needed_to_be_done() { let result = run_symbol(DummySymbol::new(vec![Ok(true)], vec![Ok(())])); assert!(result.is_ok()); } #[test] fn everything_is_ok() { let result = run_symbol(DummySymbol::new(vec![Ok(false), Ok(true)], vec![Ok(())])); assert!(result.is_ok()); } #[test] fn executing_did_not_change_state() { let result = run_symbol(DummySymbol::new(vec![Ok(false), Ok(false)], vec![Ok(())])); assert_eq!( result.unwrap_err().to_string(), "Target not reached after executing symbol" ); } #[test] fn executing_did_not_work() { let result = run_symbol(DummySymbol::new( vec![Ok(false)], vec![Err(Box::new(DummySymbolError::Error(())) as Box)], )); assert_eq!(result.unwrap_err().to_string(), "Dummy symbol error"); } #[derive(Debug)] struct SleeperSymbol; #[async_trait(?Send)] impl Symbol for SleeperSymbol { async fn target_reached(&self) -> Result> { sleep(Duration::from_millis(0)).await; Ok(true) } async fn execute(&self) -> Result<(), Box> { unimplemented!(); } } #[test] fn actually_support_parallel_execution() { run(async { let s1 = SleeperSymbol; let s2 = DummySymbol::new(vec![Ok(false), Ok(true)], vec![Ok(())]); let l1 = StoringLogger::new(); let l2 = StoringLogger::new(); let runner1 = InitializingSymbolRunner::new(); let result = try_join!( runner1.run_symbol(&s1, &l1, false), runner1.run_symbol(&s2, &l2, false), ) .unwrap(); assert_eq!(result, (false, true)); let s2 = DummySymbol::new(vec![Ok(false), Ok(true)], vec![Ok(())]); let l1 = StoringLogger::new(); let l2 = StoringLogger::new(); let runner2 = DrySymbolRunner::new(); let result = try_join!( runner2.run_symbol(&s1, &l1, false), runner2.run_symbol(&s2, &l2, false), ) .unwrap(); assert_eq!(result, (false, true)); let s2 = DummySymbol::new(vec![Ok(false), Ok(true)], vec![Ok(())]); let l1 = StoringLogger::new(); let l2 = StoringLogger::new(); let runner3 = ReportingSymbolRunner::new(runner1); let result = try_join!( runner3.run_symbol(&s1, &l1, false), runner3.run_symbol(&s2, &l2, false), ) .unwrap(); assert_eq!(result, (false, true)); }); } }