Experiment with futures
This commit is contained in:
parent
907fbf95db
commit
2d3e3688fa
44 changed files with 2081 additions and 1242 deletions
318
src/setup/symbol_runner.rs
Normal file
318
src/setup/symbol_runner.rs
Normal file
|
|
@ -0,0 +1,318 @@
|
|||
use crate::loggers::Logger;
|
||||
use crate::symbols::Symbol;
|
||||
use async_trait::async_trait;
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::fmt::Debug;
|
||||
|
||||
#[async_trait(?Send)]
|
||||
pub trait SymbolRunner {
|
||||
async fn run_symbol<S: Symbol + Debug, L: Logger>(
|
||||
&self,
|
||||
symbol: &S,
|
||||
logger: &L,
|
||||
force: bool,
|
||||
) -> Result<bool, Box<dyn Error>>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SymbolRunError {
|
||||
Symbol(Box<dyn Error>),
|
||||
ExecuteDidNotReach(()),
|
||||
}
|
||||
|
||||
impl Error for SymbolRunError {
|
||||
fn cause(&self) -> Option<&dyn Error> {
|
||||
match self {
|
||||
Self::Symbol(ref e) => Some(&**e),
|
||||
Self::ExecuteDidNotReach(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for SymbolRunError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Symbol(ref e) => write!(f, "{}", e),
|
||||
Self::ExecuteDidNotReach(_) => write!(f, "Target not reached after executing symbol"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct InitializingSymbolRunner;
|
||||
|
||||
impl InitializingSymbolRunner {
|
||||
pub fn new() -> Self {
|
||||
Self
|
||||
}
|
||||
|
||||
async fn exec_symbol<S: Symbol + Debug, L: Logger>(
|
||||
&self,
|
||||
symbol: &S,
|
||||
logger: &L,
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
logger.info(format!("Executing {:?}", symbol));
|
||||
symbol.execute().await?;
|
||||
let target_reached = symbol.target_reached().await?;
|
||||
logger.trace(format!(
|
||||
"Symbol reports target_reached: {:?} (should be true)",
|
||||
target_reached
|
||||
));
|
||||
if target_reached {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Box::new(SymbolRunError::ExecuteDidNotReach(())))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait(?Send)]
|
||||
impl SymbolRunner for InitializingSymbolRunner {
|
||||
async fn run_symbol<S: Symbol + Debug, L: Logger>(
|
||||
&self,
|
||||
symbol: &S,
|
||||
logger: &L,
|
||||
force: bool,
|
||||
) -> Result<bool, Box<dyn Error>> {
|
||||
let executed = if force {
|
||||
logger.debug("Forcing symbol execution");
|
||||
self.exec_symbol(symbol, logger).await?;
|
||||
true
|
||||
} else {
|
||||
let target_reached = symbol.target_reached().await?;
|
||||
if target_reached {
|
||||
logger.debug(format!("{:?} already reached", symbol));
|
||||
} else {
|
||||
logger.trace(format!(
|
||||
"Symbol reports target_reached: {:?}",
|
||||
target_reached
|
||||
));
|
||||
self.exec_symbol(symbol, logger).await?;
|
||||
}
|
||||
!target_reached
|
||||
};
|
||||
Ok(executed)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct DrySymbolRunner;
|
||||
|
||||
impl DrySymbolRunner {
|
||||
pub fn new() -> Self {
|
||||
Self
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait(?Send)]
|
||||
impl SymbolRunner for DrySymbolRunner {
|
||||
async fn run_symbol<S: Symbol + Debug, L: Logger>(
|
||||
&self,
|
||||
symbol: &S,
|
||||
logger: &L,
|
||||
force: bool,
|
||||
) -> Result<bool, Box<dyn Error>> {
|
||||
let would_execute = if force {
|
||||
logger.info(format!("Would force-execute {:?}", symbol));
|
||||
true
|
||||
} else {
|
||||
let target_reached = symbol.target_reached().await?;
|
||||
logger.debug(format!(
|
||||
"Symbol reports target_reached: {:?}",
|
||||
target_reached
|
||||
));
|
||||
if !target_reached {
|
||||
logger.info(format!("Would execute {:?}", symbol));
|
||||
}
|
||||
!target_reached
|
||||
};
|
||||
Ok(would_execute)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ReportingSymbolRunner<R>(R);
|
||||
|
||||
impl<R> ReportingSymbolRunner<R> {
|
||||
pub fn new(symbol_runner: R) -> Self {
|
||||
Self(symbol_runner)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait(?Send)]
|
||||
impl<R> SymbolRunner for ReportingSymbolRunner<R>
|
||||
where
|
||||
R: SymbolRunner,
|
||||
{
|
||||
async fn run_symbol<S: Symbol + Debug, L: Logger>(
|
||||
&self,
|
||||
symbol: &S,
|
||||
logger: &L,
|
||||
force: bool,
|
||||
) -> Result<bool, Box<dyn Error>> {
|
||||
logger.debug(format!("Running symbol {:?}", symbol));
|
||||
let res = self.0.run_symbol(symbol, logger, force).await;
|
||||
if let Err(ref e) = res {
|
||||
logger.info(format!("Failed on {:?} with {}, aborting.", symbol, e))
|
||||
} else {
|
||||
logger.debug(format!("Successfully finished {:?}", symbol))
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::{DrySymbolRunner, InitializingSymbolRunner, ReportingSymbolRunner, SymbolRunner};
|
||||
use crate::async_utils::sleep;
|
||||
use crate::async_utils::{run, try_join};
|
||||
use crate::loggers::StoringLogger;
|
||||
use crate::symbols::Symbol;
|
||||
use async_trait::async_trait;
|
||||
use std::cell::RefCell;
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::fmt::Debug;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
enum DummySymbolError {
|
||||
Error(()),
|
||||
}
|
||||
|
||||
impl Error for DummySymbolError {}
|
||||
|
||||
impl fmt::Display for DummySymbolError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "Dummy symbol error")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct DummySymbol<T, E> {
|
||||
_target_reached: RefCell<T>,
|
||||
_execute: RefCell<E>,
|
||||
}
|
||||
|
||||
#[async_trait(?Send)]
|
||||
impl<
|
||||
E: Iterator<Item = Result<(), Box<dyn Error>>>,
|
||||
T: Iterator<Item = Result<bool, Box<dyn Error>>>,
|
||||
> Symbol for DummySymbol<T, E>
|
||||
{
|
||||
async fn target_reached(&self) -> Result<bool, Box<dyn Error>> {
|
||||
self._target_reached.borrow_mut().next().unwrap()
|
||||
}
|
||||
async fn execute(&self) -> Result<(), Box<dyn Error>> {
|
||||
self._execute.borrow_mut().next().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl<
|
||||
E: Iterator<Item = Result<(), Box<dyn Error>>>,
|
||||
T: Iterator<Item = Result<bool, Box<dyn Error>>>,
|
||||
> DummySymbol<T, E>
|
||||
{
|
||||
fn new<
|
||||
IE: IntoIterator<IntoIter = E, Item = Result<(), Box<dyn Error>>>,
|
||||
IT: IntoIterator<IntoIter = T, Item = Result<bool, Box<dyn Error>>>,
|
||||
>(
|
||||
target_reached: IT,
|
||||
execute: IE,
|
||||
) -> Self {
|
||||
Self {
|
||||
_target_reached: RefCell::new(target_reached.into_iter()),
|
||||
_execute: RefCell::new(execute.into_iter()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn run_symbol<S: Symbol + Debug>(s: S) -> Result<bool, Box<dyn Error>> {
|
||||
run(InitializingSymbolRunner::new().run_symbol(&s, &StoringLogger::new(), false))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn nothing_needed_to_be_done() {
|
||||
let result = run_symbol(DummySymbol::new(vec![Ok(true)], vec![Ok(())]));
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn everything_is_ok() {
|
||||
let result = run_symbol(DummySymbol::new(vec![Ok(false), Ok(true)], vec![Ok(())]));
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn executing_did_not_change_state() {
|
||||
let result = run_symbol(DummySymbol::new(vec![Ok(false), Ok(false)], vec![Ok(())]));
|
||||
assert_eq!(
|
||||
result.unwrap_err().to_string(),
|
||||
"Target not reached after executing symbol"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn executing_did_not_work() {
|
||||
let result = run_symbol(DummySymbol::new(
|
||||
vec![Ok(false)],
|
||||
vec![Err(Box::new(DummySymbolError::Error(())) as Box<dyn Error>)],
|
||||
));
|
||||
assert_eq!(result.unwrap_err().to_string(), "Dummy symbol error");
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SleeperSymbol;
|
||||
|
||||
#[async_trait(?Send)]
|
||||
impl Symbol for SleeperSymbol {
|
||||
async fn target_reached(&self) -> Result<bool, Box<dyn Error>> {
|
||||
sleep(Duration::from_millis(0)).await;
|
||||
Ok(true)
|
||||
}
|
||||
async fn execute(&self) -> Result<(), Box<dyn Error>> {
|
||||
unimplemented!();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn actually_support_parallel_execution() {
|
||||
run(async {
|
||||
let s1 = SleeperSymbol;
|
||||
let s2 = DummySymbol::new(vec![Ok(false), Ok(true)], vec![Ok(())]);
|
||||
|
||||
let l1 = StoringLogger::new();
|
||||
let l2 = StoringLogger::new();
|
||||
let runner1 = InitializingSymbolRunner::new();
|
||||
let result = try_join!(
|
||||
runner1.run_symbol(&s1, &l1, false),
|
||||
runner1.run_symbol(&s2, &l2, false),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(result, (false, true));
|
||||
|
||||
let s2 = DummySymbol::new(vec![Ok(false), Ok(true)], vec![Ok(())]);
|
||||
let l1 = StoringLogger::new();
|
||||
let l2 = StoringLogger::new();
|
||||
let runner2 = DrySymbolRunner::new();
|
||||
let result = try_join!(
|
||||
runner2.run_symbol(&s1, &l1, false),
|
||||
runner2.run_symbol(&s2, &l2, false),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(result, (false, true));
|
||||
|
||||
let s2 = DummySymbol::new(vec![Ok(false), Ok(true)], vec![Ok(())]);
|
||||
let l1 = StoringLogger::new();
|
||||
let l2 = StoringLogger::new();
|
||||
let runner3 = ReportingSymbolRunner::new(runner1);
|
||||
let result = try_join!(
|
||||
runner3.run_symbol(&s1, &l1, false),
|
||||
runner3.run_symbol(&s2, &l2, false),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(result, (false, true));
|
||||
});
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue