Browse Source

Untangle and move around setup structs

master
Adrian Heine 2 years ago
parent
commit
ac1c06dd31
  1. 5
      src/resources/mod.rs
  2. 88
      src/setup/cache.rs
  3. 215
      src/setup/core.rs
  4. 385
      src/setup/mod.rs
  5. 74
      src/setup/realizer.rs
  6. 390
      src/setup/setup.rs
  7. 50
      src/setup/util.rs

5
src/resources/mod.rs

@ -228,7 +228,7 @@ impl<D> Resource for Cron<D> {
use std::rc::{Rc, Weak}; use std::rc::{Rc, Weak};
pub trait FromResource<R> { pub trait FromResource<R> {
fn from_resource(from: R) -> (Self, Weak<R>)
fn from_resource(from: &Rc<R>) -> (Self, Weak<R>)
where where
Self: Sized; Self: Sized;
} }
@ -250,8 +250,7 @@ macro_rules! default_resources {
} }
$(impl<'a, D> FromResource<$type> for DefaultResources<'a, D> { $(impl<'a, D> FromResource<$type> for DefaultResources<'a, D> {
fn from_resource(from: $type) -> (Self, Weak<$type>) {
let inner = Rc::new(from);
fn from_resource(inner: &Rc<$type>) -> (Self, Weak<$type>) {
(Self::$name(Rc::clone(&inner)), Rc::downgrade(&inner)) (Self::$name(Rc::clone(&inner)), Rc::downgrade(&inner))
} }
})* })*

88
src/setup/cache.rs

@ -0,0 +1,88 @@
use super::{Add, AddResult, AddableResource};
use crate::async_utils::sleep;
use crate::resources::{FromArtifact, FromResource};
use async_trait::async_trait;
use futures_util::future::{FutureExt, Shared};
use slog::{trace, Logger};
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt::Debug;
use std::future::Future;
use std::hash::Hash;
use std::pin::Pin;
use std::rc::Rc;
use std::time::Duration;
type ResourceCache<Rs, As> = HashMap<Rs, Shared<Pin<Box<dyn Future<Output = (As, bool)>>>>>;
#[derive(Debug)]
pub struct Cache<I, Rs, As> {
resources: RefCell<ResourceCache<Rs, As>>,
inner: Rc<I>,
}
impl<I, Rs, As> Cache<I, Rs, As> {
pub fn new(inner: I) -> Self {
Self {
resources: RefCell::default(),
inner: Rc::new(inner),
}
}
}
#[async_trait(?Send)]
impl<R: AddableResource + Debug, I, Rs, As> Add<R> for Cache<I, Rs, As>
where
Rs: Hash + Eq + 'static + FromResource<R>,
As: 'static + FromArtifact<R> + Clone,
I: 'static + Add<R>,
{
// FIXME: https://github.com/rust-lang/rust-clippy/issues/6353
#[allow(clippy::await_holding_refcell_ref)]
async fn add(&self, logger: &Rc<Logger>, resource: Rc<R>, force_run: bool) -> AddResult<R> {
let (storable_resource, weak_resource) = Rs::from_resource(&resource);
let mut resources = self.resources.borrow_mut();
let result = if let Some(future) = resources.remove(&storable_resource) {
assert!(
!force_run,
"Forcing to run an already-added resource is a logical error"
);
resources.insert(storable_resource, future.clone());
drop(resources);
trace!(logger, "Resource already added");
Ok(future.await)
} else {
let inner_weak = Rc::downgrade(&self.inner);
let logger_weak = Rc::downgrade(logger);
let future = Box::pin(async move {
let inner = inner_weak.upgrade().expect("Dangling!");
let logger = logger_weak.upgrade().expect("Dangling!");
let resource = weak_resource.upgrade().expect("Dangling!");
let result = inner.add(&logger, Rc::clone(&resource), force_run).await;
// Need to convert Box<Error> to String for Clone for Shared
result
.map(|(t, did_run)| (As::from_artifact(t), did_run))
.map_err(|e| e.to_string())
})
.shared();
let future_clone = future.clone();
resources.insert(
storable_resource,
(Box::pin(async move {
let result = future_clone.await;
if result.is_err() {
// Step back to give the initial caller time to handle the error before unwrapping
sleep(Duration::from_millis(0)).await;
}
result.unwrap()
}) as Pin<Box<dyn Future<Output = (As, bool)>>>)
.shared(),
);
drop(resources);
let result = future.await;
result.map_err(std::convert::Into::into)
};
result.map(|(t, did_run)| (t.into_artifact(), did_run))
}
}

215
src/setup/core.rs

@ -1,215 +0,0 @@
use super::runnable::Runnable;
use super::setup::Setup;
use super::util::{AddResult, AddableResource};
use super::SymbolRunner;
use crate::async_utils::join;
use crate::resources::{FromArtifact, FromResource};
use crate::symbols::Symbol;
use crate::to_artifact::ToArtifact;
use crate::{ImplementationBuilder, ResourceLocator};
use async_trait::async_trait;
use slog::{debug, o, trace, Logger};
use std::error::Error;
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
use std::rc::Rc;
#[async_trait(?Send)]
pub trait AddGeneric<X: ToArtifact> {
async fn add_generic(&self, logger: &Rc<Logger>, x: X) -> AddResult<X>;
}
macro_rules! add_generic {
( $($name:ident)* ) => (
#[async_trait(?Send)]
#[allow(non_snake_case)]
impl<SR: 'static, _L: 'static, _B: 'static, Rs: 'static + Hash + Eq, As: 'static + Clone, $($name: AddableResource,)*>
AddGeneric<($($name,)*)> for Setup<SR, _L, _B, Rs, As>
where
$(
RegularSetupCore<SR, _L, _B>: SetupCore<$name, Self>,
As: FromArtifact<$name>,
Rs: FromResource<$name>,
$name::Artifact: Clone
),*
{
#[allow(unused, clippy::shadow_unrelated)]
async fn add_generic(&self, logger: &Rc<Logger>, ($($name,)*): ($($name,)*)) -> AddResult<($($name,)*)>
{
let ($($name,)*) = join!($(self.add(logger, $name, false),)*);
let mut did_run_any = false;
$(
let (artifact, did_run) = $name?;
did_run_any = did_run_any || did_run;
let $name = artifact;
)*
Ok((($($name,)*), did_run_any))
}
}
);
}
for_each_tuple!(add_generic);
// This is for self-referential T
// FIXME: Wait for specialization
#[async_trait(?Send)]
impl<
SR: 'static + SymbolRunner,
T: AddableResource,
Rs: 'static + Hash + Eq + FromResource<T>,
As: 'static + FromArtifact<T> + Clone,
L: 'static + ResourceLocator<T, Prerequisites = Option<T>>,
B: 'static + ImplementationBuilder<T>,
> AddGeneric<Option<T>> for Setup<SR, L, B, Rs, As>
where
<B as ImplementationBuilder<T>>::Implementation: Runnable + Debug,
Self: AddGeneric<B::Prerequisites>,
T::Artifact: Clone,
// These bounds cannot be replaced by
// `RegularSetupCore<SR, L, B>: SetupCore<T, Self>`
// because the prerequisites are Option<T>, too, and thus this would
// require AddGeneric<Option<T>> to already be implemented
{
async fn add_generic(&self, logger: &Rc<Logger>, r: Option<T>) -> AddResult<Option<T>> {
Ok(match r {
Some(r) => {
let (result, did_run) = self.add(logger, r, false).await?;
(Some(result), did_run)
}
None => (None, false),
})
}
}
#[async_trait(?Send)]
impl<
T: AddableResource,
Rs: 'static + Hash + Eq + FromResource<T>,
As: 'static + FromArtifact<T> + Clone,
SR: 'static,
L: 'static,
B: 'static,
> AddGeneric<T> for Setup<SR, L, B, Rs, As>
where
T::Artifact: Clone,
RegularSetupCore<SR, L, B>: 'static + SetupCore<T, Self>,
{
async fn add_generic(&self, logger: &Rc<Logger>, r: T) -> AddResult<T> {
self.add(logger, r, false).await
}
}
#[async_trait(?Send)]
pub trait SetupCore<R: AddableResource, S> {
async fn add(
&self,
setup: &S,
logger: &Rc<Logger>,
resource: impl AsRef<R>,
force_run: bool,
) -> AddResult<R>;
}
#[derive(Debug)]
pub struct RegularSetupCore<SR, L, B> {
symbol_runner: SR,
phantom: PhantomData<(L, B)>,
}
impl<SR, L, B> RegularSetupCore<SR, L, B> {
pub fn new(symbol_runner: SR) -> Self {
Self {
symbol_runner,
phantom: PhantomData::default(),
}
}
}
#[async_trait(?Send)]
impl<SR: SymbolRunner, L, B, R: AddableResource, S> SetupCore<R, S> for RegularSetupCore<SR, L, B>
where
B: ImplementationBuilder<R>,
<B as ImplementationBuilder<R>>::Implementation: Runnable + Debug,
L: ResourceLocator<R>,
S: AddGeneric<B::Prerequisites> + AddGeneric<<L as ResourceLocator<R>>::Prerequisites>,
{
async fn add(
&self,
setup: &S,
logger: &Rc<Logger>,
resource: impl AsRef<R>,
force_run: bool,
) -> AddResult<R> {
let resource = resource.as_ref();
let logger = Rc::new(logger.new(o!("resource" => format!("{resource:?}"))));
trace!(logger, "(force_run is {})", force_run);
let (location, location_prereqs) = L::locate(resource);
trace!(logger, "Adding location prereqs ...");
let (_, location_prereqs_did_run) = setup.add_generic(&logger, location_prereqs).await?;
trace!(
logger,
"Location prereqs did_run: {}",
location_prereqs_did_run
);
trace!(logger, "Adding implementation prereqs ...");
let (prereqs, prereqs_did_run) = setup
.add_generic(&logger, B::prerequisites(resource))
.await?;
trace!(
logger,
"Implementation prereqs did_run: {}",
prereqs_did_run
);
trace!(logger, "Running implementation ...");
let implementation = B::create(resource, &location, prereqs);
let did_run = implementation
.run(
&self.symbol_runner,
&logger,
force_run || location_prereqs_did_run || prereqs_did_run,
)
.await?;
debug!(logger, "done.");
Ok((location, did_run))
}
}
#[async_trait(?Send)]
impl<SR: SymbolRunner, L, B> SymbolRunner for RegularSetupCore<SR, L, B> {
async fn run_symbol<S: Symbol + Debug>(
&self,
symbol: &S,
logger: &Logger,
force: bool,
) -> Result<bool, Box<dyn Error>> {
debug!(logger, "Directly running {:?} ...", symbol);
let result = self.symbol_runner.run_symbol(symbol, logger, force).await;
debug!(logger, "done.");
result
}
}
#[cfg(test)]
mod test {
use super::super::setup::Setup;
use super::AddGeneric;
use crate::async_utils::run;
use crate::DrySymbolRunner;
use std::rc::Rc;
#[test]
fn empty() {
let setup: Setup<_, (), (), (), ()> = Setup::new(DrySymbolRunner);
run(async {
assert!(setup
.add_generic(&Rc::new(slog::Logger::root(slog::Discard, slog::o!())), ())
.await
.is_ok());
})
}
}

385
src/setup/mod.rs

@ -1,11 +1,388 @@
mod core;
mod realizer;
mod symbol_runner; mod symbol_runner;
mod util; mod util;
pub use symbol_runner::{ pub use symbol_runner::{
DelayingSymbolRunner, DrySymbolRunner, InitializingSymbolRunner, ReportingSymbolRunner, DelayingSymbolRunner, DrySymbolRunner, InitializingSymbolRunner, ReportingSymbolRunner,
SymbolRunner, SymbolRunner,
}; };
mod cache;
mod runnable; mod runnable;
#[allow(clippy::module_inception)]
mod setup;
pub use setup::SetupFacade as Setup;
use crate::loggers::Logger;
use crate::resources::{DefaultArtifacts, DefaultResources, FromArtifact, FromResource};
use crate::symbols::Symbol;
use crate::{DefaultBuilder, DefaultLocator};
use crate::{ImplementationBuilder, ResourceLocator};
use async_trait::async_trait;
use cache::Cache;
use realizer::Realizer;
use runnable::Runnable;
use slog::o;
use std::error::Error;
use std::fmt::Debug;
use std::hash::Hash;
use std::rc::Rc;
use util::{Add, AddGeneric, AddResult, AddableResource, Recorder};
// Necessary for the recursive type
#[derive(Debug)]
pub struct ActualSetup<SR, L, B, Rs, As>(Cache<Realizer<SR, L, B, Self>, Rs, As>);
#[async_trait(?Send)]
impl<T: AddableResource + Debug, SR, L, B, Rs, As> Add<T> for ActualSetup<SR, L, B, Rs, As>
where
Cache<Realizer<SR, L, B, Self>, Rs, As>: Add<T>,
{
async fn add(&self, logger: &Rc<slog::Logger>, r: Rc<T>, force_run: bool) -> AddResult<T> {
self.0.add(logger, r, force_run).await
}
}
// This is for self-referential T
// FIXME: Wait for specialization
#[async_trait(?Send)]
impl<
SR: 'static + SymbolRunner,
T: AddableResource,
Rs: 'static + Hash + Eq + FromResource<T>,
As: 'static + FromArtifact<T> + Clone,
L: 'static + ResourceLocator<T, Prerequisites = Option<T>>,
B: 'static + ImplementationBuilder<T>,
> AddGeneric<Option<T>> for ActualSetup<SR, L, B, Rs, As>
where
<B as ImplementationBuilder<T>>::Implementation: Runnable + Debug,
Self: AddGeneric<B::Prerequisites>,
T::Artifact: Clone,
// These bounds cannot be replaced by
// `Realizer<SR, L, B>: Add<T, Self>`
// because the prerequisites are Option<T>, too, and thus this would
// require AddGeneric<Option<T>> to already be implemented
{
async fn add_generic(
&self,
logger: &Rc<slog::Logger>,
r: Option<T>,
force_run: bool,
) -> AddResult<Option<T>> {
Ok(match r {
Some(r) => {
let (result, did_run) = self.add(logger, Rc::new(r), force_run).await?;
(Some(result), did_run)
}
None => (None, false),
})
}
}
#[derive(Debug)]
pub struct Setup<
SR,
LOG,
L = DefaultLocator,
B = DefaultBuilder,
Rs = DefaultResources<'static, &'static str>,
As = DefaultArtifacts<'static, &'static str>,
>(LOG, Rc<SR>, Rc<ActualSetup<SR, L, B, Rs, As>>);
impl<SR, LOG> Setup<SR, LOG> {
pub fn new(symbol_runner: SR, logger: LOG) -> Self {
Self::new_with(symbol_runner, logger)
}
}
impl<L, B, As, SR, LOG, Rs: Hash + Eq> Setup<SR, LOG, L, B, Rs, As> {
pub fn new_with(symbol_runner: SR, logger: LOG) -> Self {
let runner = Rc::new(symbol_runner);
let inner = Rc::new_cyclic(|inner| {
ActualSetup(Cache::new(Realizer::new(Rc::clone(&runner), inner.clone())))
});
Self(logger, runner, inner)
}
}
impl<
SR: 'static,
LOG: 'static + Logger,
L: 'static,
B: 'static,
Rs: 'static + Eq + Hash,
As: 'static,
> Setup<SR, LOG, L, B, Rs, As>
{
pub async fn add_force<R: AddableResource>(&self, resource: R, force_run: bool) -> AddResult<R>
where
Rs: FromResource<R>,
As: FromArtifact<R> + Clone,
ActualSetup<SR, L, B, Rs, As>: Add<R>,
{
let recorder = Recorder::default();
let result = {
let log = Rc::new(slog::Logger::root(recorder.clone(), o!()));
self.2.add(&log, Rc::new(resource), force_run).await
};
self.log_result(recorder, result.as_ref().map(|(_, did_run)| *did_run));
result
}
pub async fn add<R: AddableResource>(&self, resource: R) -> AddResult<R>
where
Rs: FromResource<R>,
As: FromArtifact<R> + Clone,
R::Artifact: Clone,
ActualSetup<SR, L, B, Rs, As>: Add<R>,
{
self.add_force(resource, false).await
}
pub async fn run_symbol<S: Symbol + Debug>(
&self,
symbol: S,
force: bool,
) -> Result<bool, Box<dyn Error>>
where
SR: SymbolRunner,
{
let recorder = Recorder::default();
let result = {
let log = Rc::new(slog::Logger::root(
recorder.clone(),
o!("symbol" => format!("{symbol:?}")),
));
self.1.run_symbol(&symbol, &log, force).await
};
self.log_result(recorder, result.as_ref().copied());
result
}
fn log_result(&self, recorder: Recorder, result: Result<bool, &Box<dyn Error>>) {
let log = match result {
Ok(false) => String::new(),
Ok(true) => recorder.into_string(slog::Level::Info),
Err(e) => recorder.into_string(slog::Level::Trace),
};
if log.is_empty() {
self.0.write(3, ".");
} else {
self.0.writeln(3, &log);
}
}
}
#[cfg(test)]
mod test {
use super::SymbolRunner;
use crate::async_utils::run;
use crate::loggers::{Entry, StoringLogger};
use crate::resources::{FromArtifact, FromResource, Resource};
use crate::symbols::Symbol;
use crate::to_artifact::ToArtifact;
use crate::{ImplementationBuilder, ResourceLocator, Setup};
use async_trait::async_trait;
use regex::Regex;
use slog::{info, Logger};
use std::cell::RefCell;
use std::error::Error;
use std::fmt::Debug;
use std::rc::{Rc, Weak};
struct TestSymbolRunner {
count: Rc<RefCell<usize>>,
}
#[async_trait(?Send)]
impl SymbolRunner for TestSymbolRunner {
async fn run_symbol<S: Symbol + Debug>(
&self,
symbol: &S,
logger: &Logger,
force: bool,
) -> Result<bool, Box<dyn Error>> {
info!(logger, "run");
let run = force || !symbol.target_reached().await?;
if run {
*self.count.borrow_mut() += 1;
}
Ok(run)
}
}
#[derive(Debug, PartialEq, Eq, Hash)]
struct TestResource<T>(&'static str, T);
impl<T> Resource for TestResource<T> {
type Artifact = ();
}
#[derive(Debug, Hash, PartialEq, Eq)]
enum Resources {
A(Rc<TestResource<&'static str>>),
B(Rc<TestResource<()>>),
}
impl FromResource<TestResource<&'static str>> for Resources {
fn from_resource(
inner: &Rc<TestResource<&'static str>>,
) -> (Self, Weak<TestResource<&'static str>>) {
(Self::A(Rc::clone(&inner)), Rc::downgrade(&inner))
}
}
impl FromResource<TestResource<()>> for Resources {
fn from_resource(inner: &Rc<TestResource<()>>) -> (Self, Weak<TestResource<()>>) {
(Self::B(Rc::clone(&inner)), Rc::downgrade(&inner))
}
}
#[derive(Clone)]
struct Artifacts;
impl<V> FromArtifact<TestResource<V>> for Artifacts {
fn from_artifact(_from: ()) -> Self {
Self
}
#[allow(clippy::unused_unit)]
fn into_artifact(self) -> () {
#[allow(clippy::unused_unit)]
()
}
}
struct TestResourceLocator;
impl<T> ResourceLocator<TestResource<T>> for TestResourceLocator {
type Prerequisites = ();
fn locate(_resource: &TestResource<T>) -> (<TestResource<T> as ToArtifact>::Artifact, ()) {
((), ())
}
}
struct TestImplementationBuilder;
impl ImplementationBuilder<TestResource<&'static str>> for TestImplementationBuilder {
type Implementation = TestSymbol;
type Prerequisites = TestResource<()>;
fn prerequisites(resource: &TestResource<&'static str>) -> Self::Prerequisites {
TestResource(resource.1, ())
}
fn create(
resource: &TestResource<&'static str>,
(): &(),
_inputs: <Self::Prerequisites as ToArtifact>::Artifact,
) -> Self::Implementation {
TestSymbol {
reached: resource.0.chars().next().unwrap().is_uppercase(),
}
}
}
impl ImplementationBuilder<TestResource<()>> for TestImplementationBuilder {
type Implementation = TestSymbol;
type Prerequisites = ();
fn prerequisites(_resource: &TestResource<()>) -> Self::Prerequisites {}
fn create(resource: &TestResource<()>, (): &(), (): ()) -> Self::Implementation {
TestSymbol {
reached: resource.0.chars().next().unwrap().is_uppercase(),
}
}
}
#[derive(Debug)]
struct TestSymbol {
reached: bool,
}
#[async_trait(?Send)]
impl Symbol for TestSymbol {
async fn target_reached(&self) -> Result<bool, Box<dyn Error>> {
Ok(self.reached)
}
async fn execute(&self) -> Result<(), Box<dyn Error>> {
Ok(())
}
}
#[allow(clippy::type_complexity)]
fn get_setup() -> (
Rc<RefCell<usize>>,
Setup<
TestSymbolRunner,
StoringLogger,
TestResourceLocator,
TestImplementationBuilder,
Resources,
Artifacts,
>,
StoringLogger,
) {
let count = Rc::new(RefCell::new(0));
let runner = TestSymbolRunner {
count: Rc::clone(&count),
};
let logger = StoringLogger::new();
(count, Setup::new_with(runner, logger.clone()), logger)
}
#[test]
fn correctly_uses_force() {
run(async {
let (count, setup, _) = get_setup();
setup.add(TestResource("A", "b")).await.unwrap();
assert_eq!(*count.borrow(), 2);
setup.add(TestResource("A", "b")).await.unwrap();
assert_eq!(*count.borrow(), 2);
let (count, setup, _) = get_setup();
setup.add(TestResource("A", "B")).await.unwrap();
assert_eq!(*count.borrow(), 0);
});
}
#[test]
fn run_reached_symbol() {
run(async {
let (count, setup, log) = get_setup();
let did_run = setup
.run_symbol(TestSymbol { reached: true }, false)
.await
.unwrap();
drop(setup);
assert!(!did_run);
assert_eq!(*count.borrow(), 0);
assert_eq!(log.release(), vec![Entry(3, ".".into())]);
});
}
#[test]
fn run_not_reached_symbol() {
run(async {
let (count, setup, log) = get_setup();
let did_run = setup
.run_symbol(TestSymbol { reached: false }, false)
.await
.unwrap();
drop(setup);
assert!(did_run);
assert_eq!(*count.borrow(), 1);
let log = log.release();
assert_eq!(log.len(), 1);
assert_eq!(log[0].0, 3);
let re = Regex::new(r"^symbol: TestSymbol \{ reached: false \}\n \w+ \d{1,2} \d{2}:\d{2}:\d{2}.\d{3} INFO run\n$").unwrap();
assert!(re.is_match(&log[0].1));
});
}
use super::{ActualSetup, AddGeneric, Cache, Realizer};
#[test]
fn empty_tuple_add_generic() {
let setup = Rc::new_cyclic(|inner| {
ActualSetup(Cache::<Realizer<(), (), (), _>, (), ()>::new(
Realizer::new(Rc::new(()), inner.clone()),
))
});
run(async {
assert!(setup
.add_generic(
&Rc::new(slog::Logger::root(slog::Discard, slog::o!())),
(),
false
)
.await
.is_ok());
})
}
}

74
src/setup/realizer.rs

@ -0,0 +1,74 @@
use super::{Add, AddGeneric, AddResult, AddableResource, Runnable, SymbolRunner};
use crate::{ImplementationBuilder, ResourceLocator};
use async_trait::async_trait;
use slog::{debug, o, trace, Logger};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::rc::{Rc, Weak};
#[derive(Debug)]
pub struct Realizer<SR, L, B, S> {
symbol_runner: Rc<SR>,
outer: Weak<S>,
phantom: PhantomData<(L, B)>,
}
impl<SR, L, B, S> Realizer<SR, L, B, S> {
pub fn new(symbol_runner: Rc<SR>, outer: Weak<S>) -> Self {
Self {
symbol_runner,
outer,
phantom: PhantomData::default(),
}
}
}
#[async_trait(?Send)]
impl<R, SR, L, B, S> Add<R> for Realizer<SR, L, B, S>
where
R: AddableResource,
SR: SymbolRunner,
L: ResourceLocator<R>,
B: ImplementationBuilder<R>,
<B as ImplementationBuilder<R>>::Implementation: Runnable + Debug,
S: AddGeneric<B::Prerequisites> + AddGeneric<<L as ResourceLocator<R>>::Prerequisites>,
{
async fn add(&self, logger: &Rc<Logger>, resource: Rc<R>, force_run: bool) -> AddResult<R> {
let setup = self.outer.upgrade().unwrap();
let logger = Rc::new(logger.new(o!("resource" => format!("{resource:?}"))));
trace!(logger, "(force_run is {})", force_run);
let (location, location_prereqs) = L::locate(&resource);
trace!(logger, "Adding location prereqs ...");
let (_, location_prereqs_did_run) = (*setup)
.add_generic(&logger, location_prereqs, false)
.await?;
trace!(
logger,
"Location prereqs did_run: {}",
location_prereqs_did_run
);
trace!(logger, "Adding implementation prereqs ...");
let (prereqs, prereqs_did_run) = (*setup)
.add_generic(&logger, B::prerequisites(&resource), false)
.await?;
trace!(
logger,
"Implementation prereqs did_run: {}",
prereqs_did_run
);
trace!(logger, "Running implementation ...");
let implementation = B::create(&resource, &location, prereqs);
let did_run = implementation
.run(
&*self.symbol_runner,
&logger,
force_run || location_prereqs_did_run || prereqs_did_run,
)
.await?;
debug!(logger, "done.");
Ok((location, did_run))
}
}

390
src/setup/setup.rs

@ -1,390 +0,0 @@
use super::core::{RegularSetupCore, SetupCore};
use super::util::{AddResult, AddableResource, Recorder};
use super::SymbolRunner;
use crate::async_utils::sleep;
use crate::loggers::Logger;
use crate::resources::{DefaultArtifacts, DefaultResources, FromArtifact, FromResource};
use crate::symbols::Symbol;
use crate::{DefaultBuilder, DefaultLocator};
use futures_util::future::{FutureExt, Shared};
use slog::{o, trace, Logger as SlogLogger};
use std::cell::RefCell;
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Debug;
use std::future::Future;
use std::hash::Hash;
use std::pin::Pin;
use std::rc::Rc;
use std::time::Duration;
type Cache<Rs, As> = HashMap<Rs, Shared<Pin<Box<dyn Future<Output = (As, bool)>>>>>;
#[derive(Debug)]
struct SetupInner<CORE, Rs, As> {
core: CORE,
resources: RefCell<Cache<Rs, As>>,
}
#[derive(Debug)]
pub struct Setup<SR, L, B, Rs, As>(Rc<SetupInner<RegularSetupCore<SR, L, B>, Rs, As>>);
impl<SR, L, B, Rs, As> Setup<SR, L, B, Rs, As> {
pub fn new(symbol_runner: SR) -> Self {
Self(Rc::new(SetupInner {
core: RegularSetupCore::new(symbol_runner),
resources: RefCell::default(),
}))
}
}
impl<L: 'static, B: 'static, SR: 'static, Rs: Hash + Eq + 'static, As: 'static>
Setup<SR, L, B, Rs, As>
{
// FIXME: https://github.com/rust-lang/rust-clippy/issues/6353
#[allow(clippy::await_holding_refcell_ref)]
pub async fn add<R: AddableResource>(
&self,
logger: &Rc<SlogLogger>,
resource: R,
force_run: bool,
) -> AddResult<R>
where
Rs: FromResource<R>,
As: FromArtifact<R> + Clone,
RegularSetupCore<SR, L, B>: SetupCore<R, Self>,
{
let (storable_resource, weak_resource) = Rs::from_resource(resource);
let mut resources = self.0.resources.borrow_mut();
let result = if let Some(future) = resources.remove(&storable_resource) {
assert!(
!force_run,
"Forcing to run an already-added resource is a logical error"
);
resources.insert(storable_resource, future.clone());
drop(resources);
trace!(logger, "Resource already added");
Ok(future.await)
} else {
let inner_weak = Rc::downgrade(&self.0);
let logger_weak = Rc::downgrade(logger);
let future = Box::pin(async move {
let this = Self(inner_weak.upgrade().expect("Dangling!"));
let logger = logger_weak.upgrade().expect("Dangling!");
let resource = weak_resource.upgrade().expect("Dangling!");
// Need to convert Box<Error> to String for Clone for Shared
let result = this.0.core.add(&this, &logger, resource, force_run).await;
result
.map(|(t, did_run)| (As::from_artifact(t), did_run))
.map_err(|e| e.to_string())
})
.shared();
let future_clone = future.clone();
resources.insert(
storable_resource,
(Box::pin(async move {
let result = future_clone.await;
if result.is_err() {
// Step back to give the initial caller time to handle the error before unwrapping
sleep(Duration::from_millis(0)).await;
}
result.unwrap()
}) as Pin<Box<dyn Future<Output = (As, bool)>>>)
.shared(),
);
drop(resources);
let result = future.await;
result.map_err(std::convert::Into::into)
};
result.map(|(t, did_run)| (t.into_artifact(), did_run))
}
}
#[derive(Debug)]
pub struct SetupFacade<
SR,
LOG,
L = DefaultLocator,
B = DefaultBuilder,
Rs = DefaultResources<'static, &'static str>,
As = DefaultArtifacts<'static, &'static str>,
>(LOG, Setup<SR, L, B, Rs, As>);
impl<SR, LOG> SetupFacade<SR, LOG> {
pub fn new(symbol_runner: SR, logger: LOG) -> Self {
Self::new_with(symbol_runner, logger)
}
}
impl<L, B, As, SR, LOG, Rs: Hash + Eq> SetupFacade<SR, LOG, L, B, Rs, As> {
pub fn new_with(symbol_runner: SR, logger: LOG) -> Self {
Self(logger, Setup::new(symbol_runner))
}
}
impl<
SR: 'static,
LOG: 'static + Logger,
L: 'static,
B: 'static,
Rs: 'static + Eq + Hash,
As: 'static,
> SetupFacade<SR, LOG, L, B, Rs, As>
{
pub async fn add_force<R: AddableResource>(&self, resource: R, force_run: bool) -> AddResult<R>
where
Rs: FromResource<R>,
As: FromArtifact<R> + Clone,
RegularSetupCore<SR, L, B>: SetupCore<R, Setup<SR, L, B, Rs, As>>,
{
let recorder = Recorder::default();
let result = {
let log = Rc::new(slog::Logger::root(recorder.clone(), o!()));
self.1.add(&log, resource, force_run).await
};
self.log_result(recorder, result.as_ref().map(|(_, did_run)| *did_run));
result
}
pub async fn add<R: AddableResource>(&self, resource: R) -> AddResult<R>
where
Rs: FromResource<R>,
As: FromArtifact<R> + Clone,
R::Artifact: Clone,
RegularSetupCore<SR, L, B>: SetupCore<R, Setup<SR, L, B, Rs, As>>,
{
self.add_force(resource, false).await
}
pub async fn run_symbol<S: Symbol + Debug>(
&self,
symbol: S,
force: bool,
) -> Result<bool, Box<dyn Error>>
where
RegularSetupCore<SR, L, B>: SymbolRunner,
{
let recorder = Recorder::default();
let result = {
let log = Rc::new(slog::Logger::root(
recorder.clone(),
o!("symbol" => format!("{symbol:?}")),
));
(self.1).0.core.run_symbol(&symbol, &log, force).await
};
self.log_result(recorder, result.as_ref().copied());
result
}
fn log_result(&self, recorder: Recorder, result: Result<bool, &Box<dyn Error>>) {
let log = match result {
Ok(false) => String::new(),
Ok(true) => recorder.into_string(slog::Level::Info),
Err(e) => recorder.into_string(slog::Level::Trace),
};
if log.is_empty() {
self.0.write(3, ".");
} else {
self.0.writeln(3, &log);
}
}
}
#[cfg(test)]
mod test {
use super::SymbolRunner;
use crate::async_utils::run;
use crate::loggers::{Entry, StoringLogger};
use crate::resources::{FromArtifact, FromResource, Resource};
use crate::symbols::Symbol;
use crate::to_artifact::ToArtifact;
use crate::{ImplementationBuilder, ResourceLocator, Setup};
use async_trait::async_trait;
use regex::Regex;
use slog::{info, Logger};
use std::cell::RefCell;
use std::error::Error;
use std::fmt::Debug;
use std::rc::{Rc, Weak};
struct TestSymbolRunner {
count: Rc<RefCell<usize>>,
}
#[async_trait(?Send)]
impl SymbolRunner for TestSymbolRunner {
async fn run_symbol<S: Symbol + Debug>(
&self,
symbol: &S,
logger: &Logger,
force: bool,
) -> Result<bool, Box<dyn Error>> {
info!(logger, "run");
let run = force || !symbol.target_reached().await?;
if run {
*self.count.borrow_mut() += 1;
}
Ok(run)
}
}
#[derive(Debug, PartialEq, Eq, Hash)]
struct TestResource<T>(&'static str, T);
impl<T> Resource for TestResource<T> {
type Artifact = ();
}
#[derive(Debug, Hash, PartialEq, Eq)]
enum Resources {
A(Rc<TestResource<&'static str>>),
B(Rc<TestResource<()>>),
}
impl FromResource<TestResource<&'static str>> for Resources {
fn from_resource(from: TestResource<&'static str>) -> (Self, Weak<TestResource<&'static str>>) {
let inner = Rc::new(from);
(Self::A(Rc::clone(&inner)), Rc::downgrade(&inner))
}
}
impl FromResource<TestResource<()>> for Resources {
fn from_resource(from: TestResource<()>) -> (Self, Weak<TestResource<()>>) {
let inner = Rc::new(from);
(Self::B(Rc::clone(&inner)), Rc::downgrade(&inner))
}
}
#[derive(Clone)]
struct Artifacts;
impl<V> FromArtifact<TestResource<V>> for Artifacts {
fn from_artifact(_from: ()) -> Self {
Self
}
#[allow(clippy::unused_unit)]
fn into_artifact(self) -> () {
#[allow(clippy::unused_unit)]
()
}
}
struct TestResourceLocator;
impl<T> ResourceLocator<TestResource<T>> for TestResourceLocator {
type Prerequisites = ();
fn locate(_resource: &TestResource<T>) -> (<TestResource<T> as ToArtifact>::Artifact, ()) {
((), ())
}
}
struct TestImplementationBuilder;
impl ImplementationBuilder<TestResource<&'static str>> for TestImplementationBuilder {
type Implementation = TestSymbol;
type Prerequisites = TestResource<()>;
fn prerequisites(resource: &TestResource<&'static str>) -> Self::Prerequisites {
TestResource(resource.1, ())
}
fn create(
resource: &TestResource<&'static str>,
(): &(),
_inputs: <Self::Prerequisites as ToArtifact>::Artifact,
) -> Self::Implementation {
TestSymbol {
reached: resource.0.chars().next().unwrap().is_uppercase(),
}
}
}
impl ImplementationBuilder<TestResource<()>> for TestImplementationBuilder {
type Implementation = TestSymbol;
type Prerequisites = ();
fn prerequisites(_resource: &TestResource<()>) -> Self::Prerequisites {}
fn create(resource: &TestResource<()>, (): &(), (): ()) -> Self::Implementation {
TestSymbol {
reached: resource.0.chars().next().unwrap().is_uppercase(),
}
}
}
#[derive(Debug)]
struct TestSymbol {
reached: bool,
}
#[async_trait(?Send)]
impl Symbol for TestSymbol {
async fn target_reached(&self) -> Result<bool, Box<dyn Error>> {
Ok(self.reached)
}
async fn execute(&self) -> Result<(), Box<dyn Error>> {
Ok(())
}
}
#[allow(clippy::type_complexity)]
fn get_setup() -> (
Rc<RefCell<usize>>,
Setup<
TestSymbolRunner,
StoringLogger,
TestResourceLocator,
TestImplementationBuilder,
Resources,
Artifacts,
>,
StoringLogger,
) {
let count = Rc::new(RefCell::new(0));
let runner = TestSymbolRunner {
count: Rc::clone(&count),
};
let logger = StoringLogger::new();
(count, Setup::new_with(runner, logger.clone()), logger)
}
#[test]
fn correctly_uses_force() {
run(async {
let (count, setup, _) = get_setup();
setup.add(TestResource("A", "b")).await.unwrap();
assert_eq!(*count.borrow(), 2);
setup.add(TestResource("A", "b")).await.unwrap();
assert_eq!(*count.borrow(), 2);
let (count, setup, _) = get_setup();
setup.add(TestResource("A", "B")).await.unwrap();
assert_eq!(*count.borrow(), 0);
});
}
#[test]
fn run_reached_symbol() {
run(async {
let (count, setup, log) = get_setup();
let did_run = setup
.run_symbol(TestSymbol { reached: true }, false)
.await
.unwrap();
drop(setup);
assert!(!did_run);
assert_eq!(*count.borrow(), 0);
assert_eq!(log.release(), vec![Entry(3, ".".into())]);
});
}
#[test]
fn run_not_reached_symbol() {
run(async {
let (count, setup, log) = get_setup();
let did_run = setup
.run_symbol(TestSymbol { reached: false }, false)
.await
.unwrap();
drop(setup);
assert!(did_run);
assert_eq!(*count.borrow(), 1);
let log = log.release();
assert_eq!(log.len(), 1);
assert_eq!(log[0].0, 3);
let re = Regex::new(r"^symbol: TestSymbol \{ reached: false \}\n \w+ \d{1,2} \d{2}:\d{2}:\d{2}.\d{3} INFO run\n$").unwrap();
assert!(re.is_match(&log[0].1));
});
}
}

50
src/setup/util.rs

@ -1,6 +1,8 @@
use crate::async_utils::join;
use crate::resources::Resource; use crate::resources::Resource;
use crate::to_artifact::ToArtifact; use crate::to_artifact::ToArtifact;
use slog::{Drain, Filter, OwnedKVList, Record};
use async_trait::async_trait;
use slog::{Drain, Filter, Logger, OwnedKVList, Record};
use slog_async::AsyncRecord; use slog_async::AsyncRecord;
use std::cell::RefCell; use std::cell::RefCell;
use std::error::Error; use std::error::Error;
@ -14,6 +16,52 @@ impl<R> AddableResource for R where R: 'static + Resource + Debug {}
pub type AddResult<R> = Result<(<R as ToArtifact>::Artifact, bool), Box<dyn Error>>; pub type AddResult<R> = Result<(<R as ToArtifact>::Artifact, bool), Box<dyn Error>>;
#[async_trait(?Send)]
pub trait Add<R: AddableResource> {
async fn add(&self, logger: &Rc<Logger>, resource: Rc<R>, force_run: bool) -> AddResult<R>;
}
#[async_trait(?Send)]
pub trait AddGeneric<X: ToArtifact> {
async fn add_generic(&self, logger: &Rc<Logger>, x: X, force_run: bool) -> AddResult<X>;
}
macro_rules! add_generic {
( $($name:ident)* ) => (
#[async_trait(?Send)]
#[allow(non_snake_case)]
impl<_S, $($name: AddableResource,)*>
AddGeneric<($($name,)*)> for _S
where
$(
_S: AddGeneric<$name>
),*
{
#[allow(unused, clippy::shadow_unrelated)]
async fn add_generic(&self, logger: &Rc<Logger>, ($($name,)*): ($($name,)*), force_run: bool) -> AddResult<($($name,)*)>
{
let ($($name,)*) = join!($(self.add_generic(logger, $name, force_run),)*);
let mut did_run_any = false;
$(
let (artifact, did_run) = $name?;
did_run_any = did_run_any || did_run;
let $name = artifact;
)*
Ok((($($name,)*), did_run_any))
}
}
);
}
for_each_tuple!(add_generic);
#[async_trait(?Send)]
impl<R: AddableResource + Debug, S: Add<R>> AddGeneric<R> for S {
async fn add_generic(&self, logger: &Rc<Logger>, r: R, force_run: bool) -> AddResult<R> {
self.add(logger, Rc::new(r), force_run).await
}
}
// From https://users.rust-lang.org/t/how-to-send-a-writer-into-a-thread/4965/10 // From https://users.rust-lang.org/t/how-to-send-a-writer-into-a-thread/4965/10
#[derive(Clone)] #[derive(Clone)]
struct Output<W>(Rc<RefCell<W>>); struct Output<W>(Rc<RefCell<W>>);

Loading…
Cancel
Save