From 988a3b0fe72bdf15cb2e705ccb833a6451fd2024 Mon Sep 17 00:00:00 2001 From: Adrian Heine Date: Mon, 11 Sep 2023 10:53:01 +0200 Subject: [PATCH] Avoid panic with failure on multiple tree levels --- src/setup/cache.rs | 42 +++++-------- src/setup/mod.rs | 149 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 134 insertions(+), 57 deletions(-) diff --git a/src/setup/cache.rs b/src/setup/cache.rs index 1e28287..8ccb531 100644 --- a/src/setup/cache.rs +++ b/src/setup/cache.rs @@ -1,5 +1,4 @@ use super::{Add, AddResult, AddableResource}; -use crate::async_utils::sleep; use crate::resources::{FromArtifact, FromResource}; use async_trait::async_trait; use futures_util::future::{FutureExt, Shared}; @@ -11,9 +10,10 @@ use std::future::Future; use std::hash::Hash; use std::pin::Pin; use std::rc::Rc; -use std::time::Duration; -type ResourceCache = HashMap>>>>; +// FIXME: Switch Error to Rc +type ResourceCache = + HashMap>>>>>; #[derive(Debug)] pub struct Cache { @@ -42,19 +42,17 @@ where async fn add(&self, logger: &Rc, resource: Rc, force_run: bool) -> AddResult { let (storable_resource, weak_resource) = Rs::from_resource(&resource); let mut resources = self.resources.borrow_mut(); - let result = if let Some(future) = resources.remove(&storable_resource) { + let future = if let Some(future) = resources.get(&storable_resource) { assert!( !force_run, "Forcing to run an already-added resource is a logical error" ); - resources.insert(storable_resource, future.clone()); - drop(resources); trace!(logger, "Resource already added"); - Ok(future.await) + future.clone() } else { let inner_weak = Rc::downgrade(&self.inner); let logger_weak = Rc::downgrade(logger); - let future = Box::pin(async move { + let future = (Box::pin(async move { let inner = inner_weak.upgrade().expect("Dangling!"); let logger = logger_weak.upgrade().expect("Dangling!"); let resource = weak_resource.upgrade().expect("Dangling!"); @@ -64,25 +62,15 @@ where result .map(|(t, did_run)| (As::from_artifact(t), did_run)) .map_err(|e| e.to_string()) - }) - .shared(); - let future_clone = future.clone(); - resources.insert( - storable_resource, - (Box::pin(async move { - let result = future_clone.await; - if result.is_err() { - // Step back to give the initial caller time to handle the error before unwrapping - sleep(Duration::from_millis(0)).await; - } - result.unwrap() - }) as Pin>>) - .shared(), - ); - drop(resources); - let result = future.await; - result.map_err(std::convert::Into::into) + }) as Pin>>>) + .shared(); + resources.insert(storable_resource, future.clone()); + future }; - result.map(|(t, did_run)| (t.into_artifact(), did_run)) + drop(resources); + future + .await + .map(|(t, did_run)| (t.into_artifact(), did_run)) + .map_err(std::convert::Into::into) } } diff --git a/src/setup/mod.rs b/src/setup/mod.rs index e90ebf0..e1f3722 100644 --- a/src/setup/mod.rs +++ b/src/setup/mod.rs @@ -191,19 +191,38 @@ mod test { #[derive(Debug, Hash, PartialEq, Eq)] enum Resources { - A(Rc>), - B(Rc>), + A(Rc>), + B(Rc>), + C(Rc>), + D(Rc>), + } + impl FromResource> for Resources { + fn from_resource(inner: &Rc>) -> (Self, Weak>) { + (Self::A(Rc::clone(&inner)), Rc::downgrade(&inner)) + } } impl FromResource> for Resources { fn from_resource( inner: &Rc>, ) -> (Self, Weak>) { - (Self::A(Rc::clone(&inner)), Rc::downgrade(&inner)) + (Self::B(Rc::clone(&inner)), Rc::downgrade(&inner)) } } - impl FromResource> for Resources { - fn from_resource(inner: &Rc>) -> (Self, Weak>) { - (Self::B(Rc::clone(&inner)), Rc::downgrade(&inner)) + impl FromResource> for Resources { + fn from_resource( + inner: &Rc>, + ) -> (Self, Weak>) { + (Self::C(Rc::clone(&inner)), Rc::downgrade(&inner)) + } + } + impl FromResource> for Resources { + fn from_resource( + inner: &Rc>, + ) -> ( + Self, + Weak>, + ) { + (Self::D(Rc::clone(&inner)), Rc::downgrade(&inner)) } } @@ -228,7 +247,78 @@ mod test { } } + #[derive(Debug)] + struct TestSymbol { + result: Result>, + } + impl TestSymbol { + fn new(def: &str) -> Self { + let first_char = def.chars().next().unwrap(); + Self { + result: if first_char == '!' { + Err(def.into()) + } else { + Ok(first_char.is_uppercase()) + }, + } + } + } + #[async_trait(?Send)] + impl Symbol for TestSymbol { + async fn target_reached(&self) -> Result> { + self.result.clone().map_err(|s| s.to_string().into()) + } + async fn execute(&self) -> Result<(), Box> { + Ok(()) + } + } + struct TestImplementationBuilder; + + impl ImplementationBuilder> + for TestImplementationBuilder + { + type Implementation = TestSymbol; + type Prerequisites = (TestResource<(&'static str, &'static str)>, TestResource<()>); + + fn prerequisites( + resource: &TestResource<((&'static str, &'static str), &'static str)>, + ) -> Self::Prerequisites { + ( + TestResource("complex_resource", (resource.1).0), // FIXME: Only one of these can exist + TestResource((resource.1).1, ()), + ) + } + fn create( + resource: &TestResource<((&'static str, &'static str), &'static str)>, + (): &(), + _inputs: ::Artifact, + ) -> Self::Implementation { + TestSymbol::new(resource.0) + } + } + + impl ImplementationBuilder> + for TestImplementationBuilder + { + type Implementation = TestSymbol; + type Prerequisites = (TestResource<()>, TestResource<()>); + + fn prerequisites(resource: &TestResource<(&'static str, &'static str)>) -> Self::Prerequisites { + ( + TestResource((resource.1).0, ()), + TestResource((resource.1).1, ()), + ) + } + fn create( + resource: &TestResource<(&'static str, &'static str)>, + (): &(), + _inputs: ::Artifact, + ) -> Self::Implementation { + TestSymbol::new(resource.0) + } + } + impl ImplementationBuilder> for TestImplementationBuilder { type Implementation = TestSymbol; type Prerequisites = TestResource<()>; @@ -241,35 +331,17 @@ mod test { (): &(), _inputs: ::Artifact, ) -> Self::Implementation { - TestSymbol { - reached: resource.0.chars().next().unwrap().is_uppercase(), - } + TestSymbol::new(resource.0) } } + impl ImplementationBuilder> for TestImplementationBuilder { type Implementation = TestSymbol; type Prerequisites = (); fn prerequisites(_resource: &TestResource<()>) -> Self::Prerequisites {} fn create(resource: &TestResource<()>, (): &(), (): ()) -> Self::Implementation { - TestSymbol { - reached: resource.0.chars().next().unwrap().is_uppercase(), - } - } - } - - #[derive(Debug)] - struct TestSymbol { - reached: bool, - } - - #[async_trait(?Send)] - impl Symbol for TestSymbol { - async fn target_reached(&self) -> Result> { - Ok(self.reached) - } - async fn execute(&self) -> Result<(), Box> { - Ok(()) + TestSymbol::new(resource.0) } } @@ -306,12 +378,28 @@ mod test { }); } + #[test] + fn failing_dependencies_deadlock() { + run(async { + let (count, setup, _) = get_setup(); + assert_eq!( + setup + .add(TestResource("a", (("b", "!x"), "!x"))) + .await + .unwrap_err() + .to_string(), + "!x" + ); + assert_eq!(*count.borrow(), 1); + }); + } + #[test] fn run_reached_symbol() { run(async { let (count, setup, log) = get_setup(); let did_run = setup - .run_symbol(TestSymbol { reached: true }, false) + .run_symbol(TestSymbol { result: Ok(true) }, false) .await .unwrap(); drop(setup); @@ -326,7 +414,7 @@ mod test { run(async { let (count, setup, log) = get_setup(); let did_run = setup - .run_symbol(TestSymbol { reached: false }, false) + .run_symbol(TestSymbol { result: Ok(false) }, false) .await .unwrap(); drop(setup); @@ -335,7 +423,8 @@ mod test { let log = log.release(); assert_eq!(log.len(), 1); assert_eq!(log[0].0, 3); - let re = Regex::new(r"^symbol: TestSymbol \{ reached: false \}\n \w+ \d{1,2} \d{2}:\d{2}:\d{2}.\d{3} INFO run\n$").unwrap(); + let re = Regex::new(r"^symbol: TestSymbol \{ result: Ok\(false\) \}\n \w+ \d{1,2} \d{2}:\d{2}:\d{2}.\d{3} INFO run\n$").unwrap(); + assert!(re.is_match(&log[0].1)); }); }