Avoid panic with failure on multiple tree levels
This commit is contained in:
parent
93dba46387
commit
988a3b0fe7
2 changed files with 134 additions and 57 deletions
|
|
@ -1,5 +1,4 @@
|
||||||
use super::{Add, AddResult, AddableResource};
|
use super::{Add, AddResult, AddableResource};
|
||||||
use crate::async_utils::sleep;
|
|
||||||
use crate::resources::{FromArtifact, FromResource};
|
use crate::resources::{FromArtifact, FromResource};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures_util::future::{FutureExt, Shared};
|
use futures_util::future::{FutureExt, Shared};
|
||||||
|
|
@ -11,9 +10,10 @@ use std::future::Future;
|
||||||
use std::hash::Hash;
|
use std::hash::Hash;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
type ResourceCache<Rs, As> = HashMap<Rs, Shared<Pin<Box<dyn Future<Output = (As, bool)>>>>>;
|
// FIXME: Switch Error to Rc
|
||||||
|
type ResourceCache<Rs, As> =
|
||||||
|
HashMap<Rs, Shared<Pin<Box<dyn Future<Output = Result<(As, bool), String>>>>>>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Cache<I, Rs, As> {
|
pub struct Cache<I, Rs, As> {
|
||||||
|
|
@ -42,19 +42,17 @@ where
|
||||||
async fn add(&self, logger: &Rc<Logger>, resource: Rc<R>, force_run: bool) -> AddResult<R> {
|
async fn add(&self, logger: &Rc<Logger>, resource: Rc<R>, force_run: bool) -> AddResult<R> {
|
||||||
let (storable_resource, weak_resource) = Rs::from_resource(&resource);
|
let (storable_resource, weak_resource) = Rs::from_resource(&resource);
|
||||||
let mut resources = self.resources.borrow_mut();
|
let mut resources = self.resources.borrow_mut();
|
||||||
let result = if let Some(future) = resources.remove(&storable_resource) {
|
let future = if let Some(future) = resources.get(&storable_resource) {
|
||||||
assert!(
|
assert!(
|
||||||
!force_run,
|
!force_run,
|
||||||
"Forcing to run an already-added resource is a logical error"
|
"Forcing to run an already-added resource is a logical error"
|
||||||
);
|
);
|
||||||
resources.insert(storable_resource, future.clone());
|
|
||||||
drop(resources);
|
|
||||||
trace!(logger, "Resource already added");
|
trace!(logger, "Resource already added");
|
||||||
Ok(future.await)
|
future.clone()
|
||||||
} else {
|
} else {
|
||||||
let inner_weak = Rc::downgrade(&self.inner);
|
let inner_weak = Rc::downgrade(&self.inner);
|
||||||
let logger_weak = Rc::downgrade(logger);
|
let logger_weak = Rc::downgrade(logger);
|
||||||
let future = Box::pin(async move {
|
let future = (Box::pin(async move {
|
||||||
let inner = inner_weak.upgrade().expect("Dangling!");
|
let inner = inner_weak.upgrade().expect("Dangling!");
|
||||||
let logger = logger_weak.upgrade().expect("Dangling!");
|
let logger = logger_weak.upgrade().expect("Dangling!");
|
||||||
let resource = weak_resource.upgrade().expect("Dangling!");
|
let resource = weak_resource.upgrade().expect("Dangling!");
|
||||||
|
|
@ -64,25 +62,15 @@ where
|
||||||
result
|
result
|
||||||
.map(|(t, did_run)| (As::from_artifact(t), did_run))
|
.map(|(t, did_run)| (As::from_artifact(t), did_run))
|
||||||
.map_err(|e| e.to_string())
|
.map_err(|e| e.to_string())
|
||||||
})
|
}) as Pin<Box<dyn Future<Output = Result<(As, bool), String>>>>)
|
||||||
.shared();
|
.shared();
|
||||||
let future_clone = future.clone();
|
resources.insert(storable_resource, future.clone());
|
||||||
resources.insert(
|
future
|
||||||
storable_resource,
|
|
||||||
(Box::pin(async move {
|
|
||||||
let result = future_clone.await;
|
|
||||||
if result.is_err() {
|
|
||||||
// Step back to give the initial caller time to handle the error before unwrapping
|
|
||||||
sleep(Duration::from_millis(0)).await;
|
|
||||||
}
|
|
||||||
result.unwrap()
|
|
||||||
}) as Pin<Box<dyn Future<Output = (As, bool)>>>)
|
|
||||||
.shared(),
|
|
||||||
);
|
|
||||||
drop(resources);
|
|
||||||
let result = future.await;
|
|
||||||
result.map_err(std::convert::Into::into)
|
|
||||||
};
|
};
|
||||||
result.map(|(t, did_run)| (t.into_artifact(), did_run))
|
drop(resources);
|
||||||
|
future
|
||||||
|
.await
|
||||||
|
.map(|(t, did_run)| (t.into_artifact(), did_run))
|
||||||
|
.map_err(std::convert::Into::into)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
151
src/setup/mod.rs
151
src/setup/mod.rs
|
|
@ -191,21 +191,40 @@ mod test {
|
||||||
|
|
||||||
#[derive(Debug, Hash, PartialEq, Eq)]
|
#[derive(Debug, Hash, PartialEq, Eq)]
|
||||||
enum Resources {
|
enum Resources {
|
||||||
A(Rc<TestResource<&'static str>>),
|
A(Rc<TestResource<()>>),
|
||||||
B(Rc<TestResource<()>>),
|
B(Rc<TestResource<&'static str>>),
|
||||||
|
C(Rc<TestResource<(&'static str, &'static str)>>),
|
||||||
|
D(Rc<TestResource<((&'static str, &'static str), &'static str)>>),
|
||||||
|
}
|
||||||
|
impl FromResource<TestResource<()>> for Resources {
|
||||||
|
fn from_resource(inner: &Rc<TestResource<()>>) -> (Self, Weak<TestResource<()>>) {
|
||||||
|
(Self::A(Rc::clone(&inner)), Rc::downgrade(&inner))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
impl FromResource<TestResource<&'static str>> for Resources {
|
impl FromResource<TestResource<&'static str>> for Resources {
|
||||||
fn from_resource(
|
fn from_resource(
|
||||||
inner: &Rc<TestResource<&'static str>>,
|
inner: &Rc<TestResource<&'static str>>,
|
||||||
) -> (Self, Weak<TestResource<&'static str>>) {
|
) -> (Self, Weak<TestResource<&'static str>>) {
|
||||||
(Self::A(Rc::clone(&inner)), Rc::downgrade(&inner))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl FromResource<TestResource<()>> for Resources {
|
|
||||||
fn from_resource(inner: &Rc<TestResource<()>>) -> (Self, Weak<TestResource<()>>) {
|
|
||||||
(Self::B(Rc::clone(&inner)), Rc::downgrade(&inner))
|
(Self::B(Rc::clone(&inner)), Rc::downgrade(&inner))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
impl FromResource<TestResource<(&'static str, &'static str)>> for Resources {
|
||||||
|
fn from_resource(
|
||||||
|
inner: &Rc<TestResource<(&'static str, &'static str)>>,
|
||||||
|
) -> (Self, Weak<TestResource<(&'static str, &'static str)>>) {
|
||||||
|
(Self::C(Rc::clone(&inner)), Rc::downgrade(&inner))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl FromResource<TestResource<((&'static str, &'static str), &'static str)>> for Resources {
|
||||||
|
fn from_resource(
|
||||||
|
inner: &Rc<TestResource<((&'static str, &'static str), &'static str)>>,
|
||||||
|
) -> (
|
||||||
|
Self,
|
||||||
|
Weak<TestResource<((&'static str, &'static str), &'static str)>>,
|
||||||
|
) {
|
||||||
|
(Self::D(Rc::clone(&inner)), Rc::downgrade(&inner))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct Artifacts;
|
struct Artifacts;
|
||||||
|
|
@ -228,7 +247,78 @@ mod test {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct TestSymbol {
|
||||||
|
result: Result<bool, Box<str>>,
|
||||||
|
}
|
||||||
|
impl TestSymbol {
|
||||||
|
fn new(def: &str) -> Self {
|
||||||
|
let first_char = def.chars().next().unwrap();
|
||||||
|
Self {
|
||||||
|
result: if first_char == '!' {
|
||||||
|
Err(def.into())
|
||||||
|
} else {
|
||||||
|
Ok(first_char.is_uppercase())
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
impl Symbol for TestSymbol {
|
||||||
|
async fn target_reached(&self) -> Result<bool, Box<dyn Error>> {
|
||||||
|
self.result.clone().map_err(|s| s.to_string().into())
|
||||||
|
}
|
||||||
|
async fn execute(&self) -> Result<(), Box<dyn Error>> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct TestImplementationBuilder;
|
struct TestImplementationBuilder;
|
||||||
|
|
||||||
|
impl ImplementationBuilder<TestResource<((&'static str, &'static str), &'static str)>>
|
||||||
|
for TestImplementationBuilder
|
||||||
|
{
|
||||||
|
type Implementation = TestSymbol;
|
||||||
|
type Prerequisites = (TestResource<(&'static str, &'static str)>, TestResource<()>);
|
||||||
|
|
||||||
|
fn prerequisites(
|
||||||
|
resource: &TestResource<((&'static str, &'static str), &'static str)>,
|
||||||
|
) -> Self::Prerequisites {
|
||||||
|
(
|
||||||
|
TestResource("complex_resource", (resource.1).0), // FIXME: Only one of these can exist
|
||||||
|
TestResource((resource.1).1, ()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
fn create(
|
||||||
|
resource: &TestResource<((&'static str, &'static str), &'static str)>,
|
||||||
|
(): &(),
|
||||||
|
_inputs: <Self::Prerequisites as ToArtifact>::Artifact,
|
||||||
|
) -> Self::Implementation {
|
||||||
|
TestSymbol::new(resource.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ImplementationBuilder<TestResource<(&'static str, &'static str)>>
|
||||||
|
for TestImplementationBuilder
|
||||||
|
{
|
||||||
|
type Implementation = TestSymbol;
|
||||||
|
type Prerequisites = (TestResource<()>, TestResource<()>);
|
||||||
|
|
||||||
|
fn prerequisites(resource: &TestResource<(&'static str, &'static str)>) -> Self::Prerequisites {
|
||||||
|
(
|
||||||
|
TestResource((resource.1).0, ()),
|
||||||
|
TestResource((resource.1).1, ()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
fn create(
|
||||||
|
resource: &TestResource<(&'static str, &'static str)>,
|
||||||
|
(): &(),
|
||||||
|
_inputs: <Self::Prerequisites as ToArtifact>::Artifact,
|
||||||
|
) -> Self::Implementation {
|
||||||
|
TestSymbol::new(resource.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl ImplementationBuilder<TestResource<&'static str>> for TestImplementationBuilder {
|
impl ImplementationBuilder<TestResource<&'static str>> for TestImplementationBuilder {
|
||||||
type Implementation = TestSymbol;
|
type Implementation = TestSymbol;
|
||||||
type Prerequisites = TestResource<()>;
|
type Prerequisites = TestResource<()>;
|
||||||
|
|
@ -241,35 +331,17 @@ mod test {
|
||||||
(): &(),
|
(): &(),
|
||||||
_inputs: <Self::Prerequisites as ToArtifact>::Artifact,
|
_inputs: <Self::Prerequisites as ToArtifact>::Artifact,
|
||||||
) -> Self::Implementation {
|
) -> Self::Implementation {
|
||||||
TestSymbol {
|
TestSymbol::new(resource.0)
|
||||||
reached: resource.0.chars().next().unwrap().is_uppercase(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ImplementationBuilder<TestResource<()>> for TestImplementationBuilder {
|
impl ImplementationBuilder<TestResource<()>> for TestImplementationBuilder {
|
||||||
type Implementation = TestSymbol;
|
type Implementation = TestSymbol;
|
||||||
type Prerequisites = ();
|
type Prerequisites = ();
|
||||||
|
|
||||||
fn prerequisites(_resource: &TestResource<()>) -> Self::Prerequisites {}
|
fn prerequisites(_resource: &TestResource<()>) -> Self::Prerequisites {}
|
||||||
fn create(resource: &TestResource<()>, (): &(), (): ()) -> Self::Implementation {
|
fn create(resource: &TestResource<()>, (): &(), (): ()) -> Self::Implementation {
|
||||||
TestSymbol {
|
TestSymbol::new(resource.0)
|
||||||
reached: resource.0.chars().next().unwrap().is_uppercase(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct TestSymbol {
|
|
||||||
reached: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait(?Send)]
|
|
||||||
impl Symbol for TestSymbol {
|
|
||||||
async fn target_reached(&self) -> Result<bool, Box<dyn Error>> {
|
|
||||||
Ok(self.reached)
|
|
||||||
}
|
|
||||||
async fn execute(&self) -> Result<(), Box<dyn Error>> {
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -306,12 +378,28 @@ mod test {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn failing_dependencies_deadlock() {
|
||||||
|
run(async {
|
||||||
|
let (count, setup, _) = get_setup();
|
||||||
|
assert_eq!(
|
||||||
|
setup
|
||||||
|
.add(TestResource("a", (("b", "!x"), "!x")))
|
||||||
|
.await
|
||||||
|
.unwrap_err()
|
||||||
|
.to_string(),
|
||||||
|
"!x"
|
||||||
|
);
|
||||||
|
assert_eq!(*count.borrow(), 1);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn run_reached_symbol() {
|
fn run_reached_symbol() {
|
||||||
run(async {
|
run(async {
|
||||||
let (count, setup, log) = get_setup();
|
let (count, setup, log) = get_setup();
|
||||||
let did_run = setup
|
let did_run = setup
|
||||||
.run_symbol(TestSymbol { reached: true }, false)
|
.run_symbol(TestSymbol { result: Ok(true) }, false)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
drop(setup);
|
drop(setup);
|
||||||
|
|
@ -326,7 +414,7 @@ mod test {
|
||||||
run(async {
|
run(async {
|
||||||
let (count, setup, log) = get_setup();
|
let (count, setup, log) = get_setup();
|
||||||
let did_run = setup
|
let did_run = setup
|
||||||
.run_symbol(TestSymbol { reached: false }, false)
|
.run_symbol(TestSymbol { result: Ok(false) }, false)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
drop(setup);
|
drop(setup);
|
||||||
|
|
@ -335,7 +423,8 @@ mod test {
|
||||||
let log = log.release();
|
let log = log.release();
|
||||||
assert_eq!(log.len(), 1);
|
assert_eq!(log.len(), 1);
|
||||||
assert_eq!(log[0].0, 3);
|
assert_eq!(log[0].0, 3);
|
||||||
let re = Regex::new(r"^symbol: TestSymbol \{ reached: false \}\n \w+ \d{1,2} \d{2}:\d{2}:\d{2}.\d{3} INFO run\n$").unwrap();
|
let re = Regex::new(r"^symbol: TestSymbol \{ result: Ok\(false\) \}\n \w+ \d{1,2} \d{2}:\d{2}:\d{2}.\d{3} INFO run\n$").unwrap();
|
||||||
|
|
||||||
assert!(re.is_match(&log[0].1));
|
assert!(re.is_match(&log[0].1));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue