diff --git a/src/resources/mod.rs b/src/resources/mod.rs index a4a8ab9..e470765 100644 --- a/src/resources/mod.rs +++ b/src/resources/mod.rs @@ -228,7 +228,7 @@ impl Resource for Cron { use std::rc::{Rc, Weak}; pub trait FromResource { - fn from_resource(from: R) -> (Self, Weak) + fn from_resource(from: &Rc) -> (Self, Weak) where Self: Sized; } @@ -250,8 +250,7 @@ macro_rules! default_resources { } $(impl<'a, D> FromResource<$type> for DefaultResources<'a, D> { - fn from_resource(from: $type) -> (Self, Weak<$type>) { - let inner = Rc::new(from); + fn from_resource(inner: &Rc<$type>) -> (Self, Weak<$type>) { (Self::$name(Rc::clone(&inner)), Rc::downgrade(&inner)) } })* diff --git a/src/setup/cache.rs b/src/setup/cache.rs new file mode 100644 index 0000000..1e28287 --- /dev/null +++ b/src/setup/cache.rs @@ -0,0 +1,88 @@ +use super::{Add, AddResult, AddableResource}; +use crate::async_utils::sleep; +use crate::resources::{FromArtifact, FromResource}; +use async_trait::async_trait; +use futures_util::future::{FutureExt, Shared}; +use slog::{trace, Logger}; +use std::cell::RefCell; +use std::collections::HashMap; +use std::fmt::Debug; +use std::future::Future; +use std::hash::Hash; +use std::pin::Pin; +use std::rc::Rc; +use std::time::Duration; + +type ResourceCache = HashMap>>>>; + +#[derive(Debug)] +pub struct Cache { + resources: RefCell>, + inner: Rc, +} + +impl Cache { + pub fn new(inner: I) -> Self { + Self { + resources: RefCell::default(), + inner: Rc::new(inner), + } + } +} + +#[async_trait(?Send)] +impl Add for Cache +where + Rs: Hash + Eq + 'static + FromResource, + As: 'static + FromArtifact + Clone, + I: 'static + Add, +{ + // FIXME: https://github.com/rust-lang/rust-clippy/issues/6353 + #[allow(clippy::await_holding_refcell_ref)] + async fn add(&self, logger: &Rc, resource: Rc, force_run: bool) -> AddResult { + let (storable_resource, weak_resource) = Rs::from_resource(&resource); + let mut resources = self.resources.borrow_mut(); + let result = if let Some(future) = resources.remove(&storable_resource) { + assert!( + !force_run, + "Forcing to run an already-added resource is a logical error" + ); + resources.insert(storable_resource, future.clone()); + drop(resources); + trace!(logger, "Resource already added"); + Ok(future.await) + } else { + let inner_weak = Rc::downgrade(&self.inner); + let logger_weak = Rc::downgrade(logger); + let future = Box::pin(async move { + let inner = inner_weak.upgrade().expect("Dangling!"); + let logger = logger_weak.upgrade().expect("Dangling!"); + let resource = weak_resource.upgrade().expect("Dangling!"); + let result = inner.add(&logger, Rc::clone(&resource), force_run).await; + + // Need to convert Box to String for Clone for Shared + result + .map(|(t, did_run)| (As::from_artifact(t), did_run)) + .map_err(|e| e.to_string()) + }) + .shared(); + let future_clone = future.clone(); + resources.insert( + storable_resource, + (Box::pin(async move { + let result = future_clone.await; + if result.is_err() { + // Step back to give the initial caller time to handle the error before unwrapping + sleep(Duration::from_millis(0)).await; + } + result.unwrap() + }) as Pin>>) + .shared(), + ); + drop(resources); + let result = future.await; + result.map_err(std::convert::Into::into) + }; + result.map(|(t, did_run)| (t.into_artifact(), did_run)) + } +} diff --git a/src/setup/core.rs b/src/setup/core.rs deleted file mode 100644 index 4b911a1..0000000 --- a/src/setup/core.rs +++ /dev/null @@ -1,215 +0,0 @@ -use super::runnable::Runnable; -use super::setup::Setup; -use super::util::{AddResult, AddableResource}; -use super::SymbolRunner; -use crate::async_utils::join; -use crate::resources::{FromArtifact, FromResource}; -use crate::symbols::Symbol; -use crate::to_artifact::ToArtifact; -use crate::{ImplementationBuilder, ResourceLocator}; -use async_trait::async_trait; -use slog::{debug, o, trace, Logger}; -use std::error::Error; -use std::fmt::Debug; -use std::hash::Hash; -use std::marker::PhantomData; -use std::rc::Rc; - -#[async_trait(?Send)] -pub trait AddGeneric { - async fn add_generic(&self, logger: &Rc, x: X) -> AddResult; -} - -macro_rules! add_generic { - ( $($name:ident)* ) => ( - #[async_trait(?Send)] - #[allow(non_snake_case)] - impl - AddGeneric<($($name,)*)> for Setup - where - $( - RegularSetupCore: SetupCore<$name, Self>, - As: FromArtifact<$name>, - Rs: FromResource<$name>, - $name::Artifact: Clone - ),* - { - #[allow(unused, clippy::shadow_unrelated)] - async fn add_generic(&self, logger: &Rc, ($($name,)*): ($($name,)*)) -> AddResult<($($name,)*)> - { - let ($($name,)*) = join!($(self.add(logger, $name, false),)*); - let mut did_run_any = false; - $( - let (artifact, did_run) = $name?; - did_run_any = did_run_any || did_run; - let $name = artifact; - )* - Ok((($($name,)*), did_run_any)) - } - } - ); -} - -for_each_tuple!(add_generic); - -// This is for self-referential T -// FIXME: Wait for specialization -#[async_trait(?Send)] -impl< - SR: 'static + SymbolRunner, - T: AddableResource, - Rs: 'static + Hash + Eq + FromResource, - As: 'static + FromArtifact + Clone, - L: 'static + ResourceLocator>, - B: 'static + ImplementationBuilder, - > AddGeneric> for Setup -where - >::Implementation: Runnable + Debug, - Self: AddGeneric, - T::Artifact: Clone, - // These bounds cannot be replaced by - // `RegularSetupCore: SetupCore` - // because the prerequisites are Option, too, and thus this would - // require AddGeneric> to already be implemented -{ - async fn add_generic(&self, logger: &Rc, r: Option) -> AddResult> { - Ok(match r { - Some(r) => { - let (result, did_run) = self.add(logger, r, false).await?; - (Some(result), did_run) - } - None => (None, false), - }) - } -} - -#[async_trait(?Send)] -impl< - T: AddableResource, - Rs: 'static + Hash + Eq + FromResource, - As: 'static + FromArtifact + Clone, - SR: 'static, - L: 'static, - B: 'static, - > AddGeneric for Setup -where - T::Artifact: Clone, - RegularSetupCore: 'static + SetupCore, -{ - async fn add_generic(&self, logger: &Rc, r: T) -> AddResult { - self.add(logger, r, false).await - } -} - -#[async_trait(?Send)] -pub trait SetupCore { - async fn add( - &self, - setup: &S, - logger: &Rc, - resource: impl AsRef, - force_run: bool, - ) -> AddResult; -} - -#[derive(Debug)] -pub struct RegularSetupCore { - symbol_runner: SR, - phantom: PhantomData<(L, B)>, -} - -impl RegularSetupCore { - pub fn new(symbol_runner: SR) -> Self { - Self { - symbol_runner, - phantom: PhantomData::default(), - } - } -} - -#[async_trait(?Send)] -impl SetupCore for RegularSetupCore -where - B: ImplementationBuilder, - >::Implementation: Runnable + Debug, - L: ResourceLocator, - S: AddGeneric + AddGeneric<>::Prerequisites>, -{ - async fn add( - &self, - setup: &S, - logger: &Rc, - resource: impl AsRef, - force_run: bool, - ) -> AddResult { - let resource = resource.as_ref(); - let logger = Rc::new(logger.new(o!("resource" => format!("{resource:?}")))); - trace!(logger, "(force_run is {})", force_run); - let (location, location_prereqs) = L::locate(resource); - trace!(logger, "Adding location prereqs ..."); - let (_, location_prereqs_did_run) = setup.add_generic(&logger, location_prereqs).await?; - trace!( - logger, - "Location prereqs did_run: {}", - location_prereqs_did_run - ); - - trace!(logger, "Adding implementation prereqs ..."); - let (prereqs, prereqs_did_run) = setup - .add_generic(&logger, B::prerequisites(resource)) - .await?; - trace!( - logger, - "Implementation prereqs did_run: {}", - prereqs_did_run - ); - - trace!(logger, "Running implementation ..."); - let implementation = B::create(resource, &location, prereqs); - let did_run = implementation - .run( - &self.symbol_runner, - &logger, - force_run || location_prereqs_did_run || prereqs_did_run, - ) - .await?; - debug!(logger, "done."); - - Ok((location, did_run)) - } -} - -#[async_trait(?Send)] -impl SymbolRunner for RegularSetupCore { - async fn run_symbol( - &self, - symbol: &S, - logger: &Logger, - force: bool, - ) -> Result> { - debug!(logger, "Directly running {:?} ...", symbol); - let result = self.symbol_runner.run_symbol(symbol, logger, force).await; - debug!(logger, "done."); - result - } -} - -#[cfg(test)] -mod test { - use super::super::setup::Setup; - use super::AddGeneric; - use crate::async_utils::run; - use crate::DrySymbolRunner; - use std::rc::Rc; - - #[test] - fn empty() { - let setup: Setup<_, (), (), (), ()> = Setup::new(DrySymbolRunner); - run(async { - assert!(setup - .add_generic(&Rc::new(slog::Logger::root(slog::Discard, slog::o!())), ()) - .await - .is_ok()); - }) - } -} diff --git a/src/setup/mod.rs b/src/setup/mod.rs index 6392610..8302d77 100644 --- a/src/setup/mod.rs +++ b/src/setup/mod.rs @@ -1,11 +1,388 @@ -mod core; +mod realizer; mod symbol_runner; mod util; pub use symbol_runner::{ DelayingSymbolRunner, DrySymbolRunner, InitializingSymbolRunner, ReportingSymbolRunner, SymbolRunner, }; +mod cache; mod runnable; -#[allow(clippy::module_inception)] -mod setup; -pub use setup::SetupFacade as Setup; +use crate::loggers::Logger; +use crate::resources::{DefaultArtifacts, DefaultResources, FromArtifact, FromResource}; +use crate::symbols::Symbol; +use crate::{DefaultBuilder, DefaultLocator}; +use crate::{ImplementationBuilder, ResourceLocator}; +use async_trait::async_trait; +use cache::Cache; +use realizer::Realizer; +use runnable::Runnable; +use slog::o; +use std::error::Error; +use std::fmt::Debug; +use std::hash::Hash; +use std::rc::Rc; +use util::{Add, AddGeneric, AddResult, AddableResource, Recorder}; + +// Necessary for the recursive type +#[derive(Debug)] +pub struct ActualSetup(Cache, Rs, As>); + +#[async_trait(?Send)] +impl Add for ActualSetup +where + Cache, Rs, As>: Add, +{ + async fn add(&self, logger: &Rc, r: Rc, force_run: bool) -> AddResult { + self.0.add(logger, r, force_run).await + } +} + +// This is for self-referential T +// FIXME: Wait for specialization +#[async_trait(?Send)] +impl< + SR: 'static + SymbolRunner, + T: AddableResource, + Rs: 'static + Hash + Eq + FromResource, + As: 'static + FromArtifact + Clone, + L: 'static + ResourceLocator>, + B: 'static + ImplementationBuilder, + > AddGeneric> for ActualSetup +where + >::Implementation: Runnable + Debug, + Self: AddGeneric, + T::Artifact: Clone, + // These bounds cannot be replaced by + // `Realizer: Add` + // because the prerequisites are Option, too, and thus this would + // require AddGeneric> to already be implemented +{ + async fn add_generic( + &self, + logger: &Rc, + r: Option, + force_run: bool, + ) -> AddResult> { + Ok(match r { + Some(r) => { + let (result, did_run) = self.add(logger, Rc::new(r), force_run).await?; + (Some(result), did_run) + } + None => (None, false), + }) + } +} + +#[derive(Debug)] +pub struct Setup< + SR, + LOG, + L = DefaultLocator, + B = DefaultBuilder, + Rs = DefaultResources<'static, &'static str>, + As = DefaultArtifacts<'static, &'static str>, +>(LOG, Rc, Rc>); + +impl Setup { + pub fn new(symbol_runner: SR, logger: LOG) -> Self { + Self::new_with(symbol_runner, logger) + } +} + +impl Setup { + pub fn new_with(symbol_runner: SR, logger: LOG) -> Self { + let runner = Rc::new(symbol_runner); + let inner = Rc::new_cyclic(|inner| { + ActualSetup(Cache::new(Realizer::new(Rc::clone(&runner), inner.clone()))) + }); + Self(logger, runner, inner) + } +} + +impl< + SR: 'static, + LOG: 'static + Logger, + L: 'static, + B: 'static, + Rs: 'static + Eq + Hash, + As: 'static, + > Setup +{ + pub async fn add_force(&self, resource: R, force_run: bool) -> AddResult + where + Rs: FromResource, + As: FromArtifact + Clone, + ActualSetup: Add, + { + let recorder = Recorder::default(); + let result = { + let log = Rc::new(slog::Logger::root(recorder.clone(), o!())); + self.2.add(&log, Rc::new(resource), force_run).await + }; + self.log_result(recorder, result.as_ref().map(|(_, did_run)| *did_run)); + result + } + + pub async fn add(&self, resource: R) -> AddResult + where + Rs: FromResource, + As: FromArtifact + Clone, + R::Artifact: Clone, + ActualSetup: Add, + { + self.add_force(resource, false).await + } + + pub async fn run_symbol( + &self, + symbol: S, + force: bool, + ) -> Result> + where + SR: SymbolRunner, + { + let recorder = Recorder::default(); + let result = { + let log = Rc::new(slog::Logger::root( + recorder.clone(), + o!("symbol" => format!("{symbol:?}")), + )); + self.1.run_symbol(&symbol, &log, force).await + }; + self.log_result(recorder, result.as_ref().copied()); + result + } + + fn log_result(&self, recorder: Recorder, result: Result>) { + let log = match result { + Ok(false) => String::new(), + Ok(true) => recorder.into_string(slog::Level::Info), + Err(e) => recorder.into_string(slog::Level::Trace), + }; + if log.is_empty() { + self.0.write(3, "."); + } else { + self.0.writeln(3, &log); + } + } +} + +#[cfg(test)] +mod test { + use super::SymbolRunner; + use crate::async_utils::run; + use crate::loggers::{Entry, StoringLogger}; + use crate::resources::{FromArtifact, FromResource, Resource}; + use crate::symbols::Symbol; + use crate::to_artifact::ToArtifact; + use crate::{ImplementationBuilder, ResourceLocator, Setup}; + use async_trait::async_trait; + use regex::Regex; + use slog::{info, Logger}; + use std::cell::RefCell; + use std::error::Error; + use std::fmt::Debug; + use std::rc::{Rc, Weak}; + + struct TestSymbolRunner { + count: Rc>, + } + + #[async_trait(?Send)] + impl SymbolRunner for TestSymbolRunner { + async fn run_symbol( + &self, + symbol: &S, + logger: &Logger, + force: bool, + ) -> Result> { + info!(logger, "run"); + let run = force || !symbol.target_reached().await?; + if run { + *self.count.borrow_mut() += 1; + } + Ok(run) + } + } + + #[derive(Debug, PartialEq, Eq, Hash)] + struct TestResource(&'static str, T); + impl Resource for TestResource { + type Artifact = (); + } + + #[derive(Debug, Hash, PartialEq, Eq)] + enum Resources { + A(Rc>), + B(Rc>), + } + impl FromResource> for Resources { + fn from_resource( + inner: &Rc>, + ) -> (Self, Weak>) { + (Self::A(Rc::clone(&inner)), Rc::downgrade(&inner)) + } + } + impl FromResource> for Resources { + fn from_resource(inner: &Rc>) -> (Self, Weak>) { + (Self::B(Rc::clone(&inner)), Rc::downgrade(&inner)) + } + } + + #[derive(Clone)] + struct Artifacts; + impl FromArtifact> for Artifacts { + fn from_artifact(_from: ()) -> Self { + Self + } + #[allow(clippy::unused_unit)] + fn into_artifact(self) -> () { + #[allow(clippy::unused_unit)] + () + } + } + + struct TestResourceLocator; + impl ResourceLocator> for TestResourceLocator { + type Prerequisites = (); + fn locate(_resource: &TestResource) -> ( as ToArtifact>::Artifact, ()) { + ((), ()) + } + } + + struct TestImplementationBuilder; + impl ImplementationBuilder> for TestImplementationBuilder { + type Implementation = TestSymbol; + type Prerequisites = TestResource<()>; + + fn prerequisites(resource: &TestResource<&'static str>) -> Self::Prerequisites { + TestResource(resource.1, ()) + } + fn create( + resource: &TestResource<&'static str>, + (): &(), + _inputs: ::Artifact, + ) -> Self::Implementation { + TestSymbol { + reached: resource.0.chars().next().unwrap().is_uppercase(), + } + } + } + impl ImplementationBuilder> for TestImplementationBuilder { + type Implementation = TestSymbol; + type Prerequisites = (); + + fn prerequisites(_resource: &TestResource<()>) -> Self::Prerequisites {} + fn create(resource: &TestResource<()>, (): &(), (): ()) -> Self::Implementation { + TestSymbol { + reached: resource.0.chars().next().unwrap().is_uppercase(), + } + } + } + + #[derive(Debug)] + struct TestSymbol { + reached: bool, + } + + #[async_trait(?Send)] + impl Symbol for TestSymbol { + async fn target_reached(&self) -> Result> { + Ok(self.reached) + } + async fn execute(&self) -> Result<(), Box> { + Ok(()) + } + } + + #[allow(clippy::type_complexity)] + fn get_setup() -> ( + Rc>, + Setup< + TestSymbolRunner, + StoringLogger, + TestResourceLocator, + TestImplementationBuilder, + Resources, + Artifacts, + >, + StoringLogger, + ) { + let count = Rc::new(RefCell::new(0)); + let runner = TestSymbolRunner { + count: Rc::clone(&count), + }; + let logger = StoringLogger::new(); + (count, Setup::new_with(runner, logger.clone()), logger) + } + + #[test] + fn correctly_uses_force() { + run(async { + let (count, setup, _) = get_setup(); + setup.add(TestResource("A", "b")).await.unwrap(); + assert_eq!(*count.borrow(), 2); + setup.add(TestResource("A", "b")).await.unwrap(); + assert_eq!(*count.borrow(), 2); + + let (count, setup, _) = get_setup(); + setup.add(TestResource("A", "B")).await.unwrap(); + assert_eq!(*count.borrow(), 0); + }); + } + + #[test] + fn run_reached_symbol() { + run(async { + let (count, setup, log) = get_setup(); + let did_run = setup + .run_symbol(TestSymbol { reached: true }, false) + .await + .unwrap(); + drop(setup); + assert!(!did_run); + assert_eq!(*count.borrow(), 0); + assert_eq!(log.release(), vec![Entry(3, ".".into())]); + }); + } + + #[test] + fn run_not_reached_symbol() { + run(async { + let (count, setup, log) = get_setup(); + let did_run = setup + .run_symbol(TestSymbol { reached: false }, false) + .await + .unwrap(); + drop(setup); + assert!(did_run); + assert_eq!(*count.borrow(), 1); + let log = log.release(); + assert_eq!(log.len(), 1); + assert_eq!(log[0].0, 3); + let re = Regex::new(r"^symbol: TestSymbol \{ reached: false \}\n \w+ \d{1,2} \d{2}:\d{2}:\d{2}.\d{3} INFO run\n$").unwrap(); + assert!(re.is_match(&log[0].1)); + }); + } + + use super::{ActualSetup, AddGeneric, Cache, Realizer}; + + #[test] + fn empty_tuple_add_generic() { + let setup = Rc::new_cyclic(|inner| { + ActualSetup(Cache::, (), ()>::new( + Realizer::new(Rc::new(()), inner.clone()), + )) + }); + run(async { + assert!(setup + .add_generic( + &Rc::new(slog::Logger::root(slog::Discard, slog::o!())), + (), + false + ) + .await + .is_ok()); + }) + } +} diff --git a/src/setup/realizer.rs b/src/setup/realizer.rs new file mode 100644 index 0000000..2757186 --- /dev/null +++ b/src/setup/realizer.rs @@ -0,0 +1,74 @@ +use super::{Add, AddGeneric, AddResult, AddableResource, Runnable, SymbolRunner}; +use crate::{ImplementationBuilder, ResourceLocator}; +use async_trait::async_trait; +use slog::{debug, o, trace, Logger}; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::rc::{Rc, Weak}; + +#[derive(Debug)] +pub struct Realizer { + symbol_runner: Rc, + outer: Weak, + phantom: PhantomData<(L, B)>, +} + +impl Realizer { + pub fn new(symbol_runner: Rc, outer: Weak) -> Self { + Self { + symbol_runner, + outer, + phantom: PhantomData::default(), + } + } +} + +#[async_trait(?Send)] +impl Add for Realizer +where + R: AddableResource, + SR: SymbolRunner, + L: ResourceLocator, + B: ImplementationBuilder, + >::Implementation: Runnable + Debug, + S: AddGeneric + AddGeneric<>::Prerequisites>, +{ + async fn add(&self, logger: &Rc, resource: Rc, force_run: bool) -> AddResult { + let setup = self.outer.upgrade().unwrap(); + let logger = Rc::new(logger.new(o!("resource" => format!("{resource:?}")))); + trace!(logger, "(force_run is {})", force_run); + let (location, location_prereqs) = L::locate(&resource); + trace!(logger, "Adding location prereqs ..."); + let (_, location_prereqs_did_run) = (*setup) + .add_generic(&logger, location_prereqs, false) + .await?; + trace!( + logger, + "Location prereqs did_run: {}", + location_prereqs_did_run + ); + + trace!(logger, "Adding implementation prereqs ..."); + let (prereqs, prereqs_did_run) = (*setup) + .add_generic(&logger, B::prerequisites(&resource), false) + .await?; + trace!( + logger, + "Implementation prereqs did_run: {}", + prereqs_did_run + ); + + trace!(logger, "Running implementation ..."); + let implementation = B::create(&resource, &location, prereqs); + let did_run = implementation + .run( + &*self.symbol_runner, + &logger, + force_run || location_prereqs_did_run || prereqs_did_run, + ) + .await?; + debug!(logger, "done."); + + Ok((location, did_run)) + } +} diff --git a/src/setup/setup.rs b/src/setup/setup.rs deleted file mode 100644 index 976d17e..0000000 --- a/src/setup/setup.rs +++ /dev/null @@ -1,390 +0,0 @@ -use super::core::{RegularSetupCore, SetupCore}; -use super::util::{AddResult, AddableResource, Recorder}; -use super::SymbolRunner; -use crate::async_utils::sleep; -use crate::loggers::Logger; -use crate::resources::{DefaultArtifacts, DefaultResources, FromArtifact, FromResource}; -use crate::symbols::Symbol; -use crate::{DefaultBuilder, DefaultLocator}; -use futures_util::future::{FutureExt, Shared}; -use slog::{o, trace, Logger as SlogLogger}; -use std::cell::RefCell; -use std::collections::HashMap; -use std::error::Error; -use std::fmt::Debug; -use std::future::Future; -use std::hash::Hash; -use std::pin::Pin; -use std::rc::Rc; -use std::time::Duration; - -type Cache = HashMap>>>>; - -#[derive(Debug)] -struct SetupInner { - core: CORE, - resources: RefCell>, -} - -#[derive(Debug)] -pub struct Setup(Rc, Rs, As>>); - -impl Setup { - pub fn new(symbol_runner: SR) -> Self { - Self(Rc::new(SetupInner { - core: RegularSetupCore::new(symbol_runner), - resources: RefCell::default(), - })) - } -} -impl - Setup -{ - // FIXME: https://github.com/rust-lang/rust-clippy/issues/6353 - #[allow(clippy::await_holding_refcell_ref)] - pub async fn add( - &self, - logger: &Rc, - resource: R, - force_run: bool, - ) -> AddResult - where - Rs: FromResource, - As: FromArtifact + Clone, - RegularSetupCore: SetupCore, - { - let (storable_resource, weak_resource) = Rs::from_resource(resource); - let mut resources = self.0.resources.borrow_mut(); - let result = if let Some(future) = resources.remove(&storable_resource) { - assert!( - !force_run, - "Forcing to run an already-added resource is a logical error" - ); - resources.insert(storable_resource, future.clone()); - drop(resources); - trace!(logger, "Resource already added"); - Ok(future.await) - } else { - let inner_weak = Rc::downgrade(&self.0); - let logger_weak = Rc::downgrade(logger); - let future = Box::pin(async move { - let this = Self(inner_weak.upgrade().expect("Dangling!")); - let logger = logger_weak.upgrade().expect("Dangling!"); - let resource = weak_resource.upgrade().expect("Dangling!"); - // Need to convert Box to String for Clone for Shared - let result = this.0.core.add(&this, &logger, resource, force_run).await; - - result - .map(|(t, did_run)| (As::from_artifact(t), did_run)) - .map_err(|e| e.to_string()) - }) - .shared(); - let future_clone = future.clone(); - resources.insert( - storable_resource, - (Box::pin(async move { - let result = future_clone.await; - if result.is_err() { - // Step back to give the initial caller time to handle the error before unwrapping - sleep(Duration::from_millis(0)).await; - } - result.unwrap() - }) as Pin>>) - .shared(), - ); - drop(resources); - let result = future.await; - result.map_err(std::convert::Into::into) - }; - result.map(|(t, did_run)| (t.into_artifact(), did_run)) - } -} - -#[derive(Debug)] -pub struct SetupFacade< - SR, - LOG, - L = DefaultLocator, - B = DefaultBuilder, - Rs = DefaultResources<'static, &'static str>, - As = DefaultArtifacts<'static, &'static str>, ->(LOG, Setup); - -impl SetupFacade { - pub fn new(symbol_runner: SR, logger: LOG) -> Self { - Self::new_with(symbol_runner, logger) - } -} - -impl SetupFacade { - pub fn new_with(symbol_runner: SR, logger: LOG) -> Self { - Self(logger, Setup::new(symbol_runner)) - } -} - -impl< - SR: 'static, - LOG: 'static + Logger, - L: 'static, - B: 'static, - Rs: 'static + Eq + Hash, - As: 'static, - > SetupFacade -{ - pub async fn add_force(&self, resource: R, force_run: bool) -> AddResult - where - Rs: FromResource, - As: FromArtifact + Clone, - RegularSetupCore: SetupCore>, - { - let recorder = Recorder::default(); - let result = { - let log = Rc::new(slog::Logger::root(recorder.clone(), o!())); - self.1.add(&log, resource, force_run).await - }; - self.log_result(recorder, result.as_ref().map(|(_, did_run)| *did_run)); - result - } - - pub async fn add(&self, resource: R) -> AddResult - where - Rs: FromResource, - As: FromArtifact + Clone, - R::Artifact: Clone, - RegularSetupCore: SetupCore>, - { - self.add_force(resource, false).await - } - - pub async fn run_symbol( - &self, - symbol: S, - force: bool, - ) -> Result> - where - RegularSetupCore: SymbolRunner, - { - let recorder = Recorder::default(); - let result = { - let log = Rc::new(slog::Logger::root( - recorder.clone(), - o!("symbol" => format!("{symbol:?}")), - )); - (self.1).0.core.run_symbol(&symbol, &log, force).await - }; - self.log_result(recorder, result.as_ref().copied()); - result - } - - fn log_result(&self, recorder: Recorder, result: Result>) { - let log = match result { - Ok(false) => String::new(), - Ok(true) => recorder.into_string(slog::Level::Info), - Err(e) => recorder.into_string(slog::Level::Trace), - }; - if log.is_empty() { - self.0.write(3, "."); - } else { - self.0.writeln(3, &log); - } - } -} - -#[cfg(test)] -mod test { - use super::SymbolRunner; - use crate::async_utils::run; - use crate::loggers::{Entry, StoringLogger}; - use crate::resources::{FromArtifact, FromResource, Resource}; - use crate::symbols::Symbol; - use crate::to_artifact::ToArtifact; - use crate::{ImplementationBuilder, ResourceLocator, Setup}; - use async_trait::async_trait; - use regex::Regex; - use slog::{info, Logger}; - use std::cell::RefCell; - use std::error::Error; - use std::fmt::Debug; - use std::rc::{Rc, Weak}; - - struct TestSymbolRunner { - count: Rc>, - } - - #[async_trait(?Send)] - impl SymbolRunner for TestSymbolRunner { - async fn run_symbol( - &self, - symbol: &S, - logger: &Logger, - force: bool, - ) -> Result> { - info!(logger, "run"); - let run = force || !symbol.target_reached().await?; - if run { - *self.count.borrow_mut() += 1; - } - Ok(run) - } - } - - #[derive(Debug, PartialEq, Eq, Hash)] - struct TestResource(&'static str, T); - impl Resource for TestResource { - type Artifact = (); - } - - #[derive(Debug, Hash, PartialEq, Eq)] - enum Resources { - A(Rc>), - B(Rc>), - } - impl FromResource> for Resources { - fn from_resource(from: TestResource<&'static str>) -> (Self, Weak>) { - let inner = Rc::new(from); - (Self::A(Rc::clone(&inner)), Rc::downgrade(&inner)) - } - } - impl FromResource> for Resources { - fn from_resource(from: TestResource<()>) -> (Self, Weak>) { - let inner = Rc::new(from); - (Self::B(Rc::clone(&inner)), Rc::downgrade(&inner)) - } - } - - #[derive(Clone)] - struct Artifacts; - impl FromArtifact> for Artifacts { - fn from_artifact(_from: ()) -> Self { - Self - } - #[allow(clippy::unused_unit)] - fn into_artifact(self) -> () { - #[allow(clippy::unused_unit)] - () - } - } - - struct TestResourceLocator; - impl ResourceLocator> for TestResourceLocator { - type Prerequisites = (); - fn locate(_resource: &TestResource) -> ( as ToArtifact>::Artifact, ()) { - ((), ()) - } - } - - struct TestImplementationBuilder; - impl ImplementationBuilder> for TestImplementationBuilder { - type Implementation = TestSymbol; - type Prerequisites = TestResource<()>; - - fn prerequisites(resource: &TestResource<&'static str>) -> Self::Prerequisites { - TestResource(resource.1, ()) - } - fn create( - resource: &TestResource<&'static str>, - (): &(), - _inputs: ::Artifact, - ) -> Self::Implementation { - TestSymbol { - reached: resource.0.chars().next().unwrap().is_uppercase(), - } - } - } - impl ImplementationBuilder> for TestImplementationBuilder { - type Implementation = TestSymbol; - type Prerequisites = (); - - fn prerequisites(_resource: &TestResource<()>) -> Self::Prerequisites {} - fn create(resource: &TestResource<()>, (): &(), (): ()) -> Self::Implementation { - TestSymbol { - reached: resource.0.chars().next().unwrap().is_uppercase(), - } - } - } - - #[derive(Debug)] - struct TestSymbol { - reached: bool, - } - - #[async_trait(?Send)] - impl Symbol for TestSymbol { - async fn target_reached(&self) -> Result> { - Ok(self.reached) - } - async fn execute(&self) -> Result<(), Box> { - Ok(()) - } - } - - #[allow(clippy::type_complexity)] - fn get_setup() -> ( - Rc>, - Setup< - TestSymbolRunner, - StoringLogger, - TestResourceLocator, - TestImplementationBuilder, - Resources, - Artifacts, - >, - StoringLogger, - ) { - let count = Rc::new(RefCell::new(0)); - let runner = TestSymbolRunner { - count: Rc::clone(&count), - }; - let logger = StoringLogger::new(); - (count, Setup::new_with(runner, logger.clone()), logger) - } - - #[test] - fn correctly_uses_force() { - run(async { - let (count, setup, _) = get_setup(); - setup.add(TestResource("A", "b")).await.unwrap(); - assert_eq!(*count.borrow(), 2); - setup.add(TestResource("A", "b")).await.unwrap(); - assert_eq!(*count.borrow(), 2); - - let (count, setup, _) = get_setup(); - setup.add(TestResource("A", "B")).await.unwrap(); - assert_eq!(*count.borrow(), 0); - }); - } - - #[test] - fn run_reached_symbol() { - run(async { - let (count, setup, log) = get_setup(); - let did_run = setup - .run_symbol(TestSymbol { reached: true }, false) - .await - .unwrap(); - drop(setup); - assert!(!did_run); - assert_eq!(*count.borrow(), 0); - assert_eq!(log.release(), vec![Entry(3, ".".into())]); - }); - } - - #[test] - fn run_not_reached_symbol() { - run(async { - let (count, setup, log) = get_setup(); - let did_run = setup - .run_symbol(TestSymbol { reached: false }, false) - .await - .unwrap(); - drop(setup); - assert!(did_run); - assert_eq!(*count.borrow(), 1); - let log = log.release(); - assert_eq!(log.len(), 1); - assert_eq!(log[0].0, 3); - let re = Regex::new(r"^symbol: TestSymbol \{ reached: false \}\n \w+ \d{1,2} \d{2}:\d{2}:\d{2}.\d{3} INFO run\n$").unwrap(); - assert!(re.is_match(&log[0].1)); - }); - } -} diff --git a/src/setup/util.rs b/src/setup/util.rs index 4602f2d..6951165 100644 --- a/src/setup/util.rs +++ b/src/setup/util.rs @@ -1,6 +1,8 @@ +use crate::async_utils::join; use crate::resources::Resource; use crate::to_artifact::ToArtifact; -use slog::{Drain, Filter, OwnedKVList, Record}; +use async_trait::async_trait; +use slog::{Drain, Filter, Logger, OwnedKVList, Record}; use slog_async::AsyncRecord; use std::cell::RefCell; use std::error::Error; @@ -14,6 +16,52 @@ impl AddableResource for R where R: 'static + Resource + Debug {} pub type AddResult = Result<(::Artifact, bool), Box>; +#[async_trait(?Send)] +pub trait Add { + async fn add(&self, logger: &Rc, resource: Rc, force_run: bool) -> AddResult; +} + +#[async_trait(?Send)] +pub trait AddGeneric { + async fn add_generic(&self, logger: &Rc, x: X, force_run: bool) -> AddResult; +} + +macro_rules! add_generic { + ( $($name:ident)* ) => ( + #[async_trait(?Send)] + #[allow(non_snake_case)] + impl<_S, $($name: AddableResource,)*> + AddGeneric<($($name,)*)> for _S + where + $( + _S: AddGeneric<$name> + ),* + { + #[allow(unused, clippy::shadow_unrelated)] + async fn add_generic(&self, logger: &Rc, ($($name,)*): ($($name,)*), force_run: bool) -> AddResult<($($name,)*)> + { + let ($($name,)*) = join!($(self.add_generic(logger, $name, force_run),)*); + let mut did_run_any = false; + $( + let (artifact, did_run) = $name?; + did_run_any = did_run_any || did_run; + let $name = artifact; + )* + Ok((($($name,)*), did_run_any)) + } + } + ); +} + +for_each_tuple!(add_generic); + +#[async_trait(?Send)] +impl> AddGeneric for S { + async fn add_generic(&self, logger: &Rc, r: R, force_run: bool) -> AddResult { + self.add(logger, Rc::new(r), force_run).await + } +} + // From https://users.rust-lang.org/t/how-to-send-a-writer-into-a-thread/4965/10 #[derive(Clone)] struct Output(Rc>);