You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
153 lines
4.6 KiB
153 lines
4.6 KiB
use super::{Add, AddResult, AddableResource};
|
|
use crate::resources::{FromArtifact, FromResource};
|
|
use async_trait::async_trait;
|
|
use futures_util::future::{FutureExt, Shared};
|
|
use slog::{trace, Logger};
|
|
use std::cell::RefCell;
|
|
use std::collections::HashMap;
|
|
use std::fmt::Debug;
|
|
use std::future::Future;
|
|
use std::hash::Hash;
|
|
use std::pin::Pin;
|
|
use std::rc::Rc;
|
|
|
|
// FIXME: Switch Error to Rc
|
|
type ResourceCache<Rs, As> =
|
|
HashMap<Rs, Shared<Pin<Box<dyn Future<Output = Result<(As, bool), String>>>>>>;
|
|
|
|
#[derive(Debug)]
|
|
pub struct Cache<I, Rs, As> {
|
|
resources: RefCell<ResourceCache<Rs, As>>,
|
|
inner: Rc<I>,
|
|
}
|
|
|
|
impl<I, Rs, As> Cache<I, Rs, As> {
|
|
pub fn new(inner: I) -> Self {
|
|
Self {
|
|
resources: RefCell::default(),
|
|
inner: Rc::new(inner),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait(?Send)]
|
|
impl<R: AddableResource + Debug, I, Rs, As> Add<R> for Cache<I, Rs, As>
|
|
where
|
|
Rs: Hash + Eq + 'static + FromResource<R>,
|
|
As: 'static + FromArtifact<R> + Clone,
|
|
I: 'static + Add<R>,
|
|
{
|
|
// FIXME: https://github.com/rust-lang/rust-clippy/issues/6353
|
|
#[allow(clippy::await_holding_refcell_ref)]
|
|
async fn add(&self, logger: &Rc<Logger>, resource: Rc<R>, force_run: bool) -> AddResult<R> {
|
|
let (storable_resource, weak_resource) = Rs::from_resource(&resource);
|
|
let mut resources = self.resources.borrow_mut();
|
|
let future = if let Some(future) = resources.get(&storable_resource) {
|
|
assert!(
|
|
!force_run,
|
|
"Forcing to run an already-added resource is a logical error"
|
|
);
|
|
trace!(logger, "Resource already added");
|
|
future.clone()
|
|
} else {
|
|
let inner_weak = Rc::downgrade(&self.inner);
|
|
let logger_weak = Rc::downgrade(logger);
|
|
let future = (Box::pin(async move {
|
|
let inner = inner_weak.upgrade().expect("Dangling!");
|
|
let logger = logger_weak.upgrade().expect("Dangling!");
|
|
let resource = weak_resource.upgrade().expect("Dangling!");
|
|
let result = inner.add(&logger, Rc::clone(&resource), force_run).await;
|
|
|
|
// Need to convert Box<Error> to String for Clone for Shared
|
|
result
|
|
.map(|(t, did_run)| (As::from_artifact(t), did_run))
|
|
.map_err(|e| e.to_string())
|
|
}) as Pin<Box<dyn Future<Output = Result<(As, bool), String>>>>)
|
|
.shared();
|
|
resources.insert(storable_resource, future.clone());
|
|
future
|
|
};
|
|
drop(resources);
|
|
future
|
|
.await
|
|
.map(|(t, did_run)| (t.into_artifact(), did_run))
|
|
.map_err(std::convert::Into::into)
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod test {
|
|
use super::{Add, AddResult, Cache, Logger};
|
|
use crate::async_utils::{join, run};
|
|
use crate::resources::{FromArtifact, FromResource, Resource};
|
|
use async_trait::async_trait;
|
|
use std::fmt::Debug;
|
|
use std::rc::{Rc, Weak};
|
|
|
|
#[derive(Debug, PartialEq, Eq, Hash)]
|
|
struct TestResource<T>(&'static str, T);
|
|
impl<T> Resource for TestResource<T> {
|
|
type Artifact = ();
|
|
}
|
|
|
|
#[derive(Debug, Hash, PartialEq, Eq)]
|
|
enum Resources {
|
|
A(Rc<TestResource<&'static str>>),
|
|
B(Rc<TestResource<()>>),
|
|
}
|
|
impl FromResource<TestResource<&'static str>> for Resources {
|
|
fn from_resource(
|
|
inner: &Rc<TestResource<&'static str>>,
|
|
) -> (Self, Weak<TestResource<&'static str>>) {
|
|
(Self::A(Rc::clone(&inner)), Rc::downgrade(&inner))
|
|
}
|
|
}
|
|
impl FromResource<TestResource<()>> for Resources {
|
|
fn from_resource(inner: &Rc<TestResource<()>>) -> (Self, Weak<TestResource<()>>) {
|
|
(Self::B(Rc::clone(&inner)), Rc::downgrade(&inner))
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct Artifacts;
|
|
impl<V> FromArtifact<TestResource<V>> for Artifacts {
|
|
fn from_artifact(_from: ()) -> Self {
|
|
Self
|
|
}
|
|
#[allow(clippy::unused_unit)]
|
|
fn into_artifact(self) -> () {
|
|
#[allow(clippy::unused_unit)]
|
|
()
|
|
}
|
|
}
|
|
|
|
struct Inner;
|
|
|
|
#[async_trait(?Send)]
|
|
impl<T: Debug + 'static> Add<TestResource<T>> for Inner {
|
|
async fn add(
|
|
&self,
|
|
logger: &Rc<Logger>,
|
|
resource: Rc<TestResource<T>>,
|
|
force_run: bool,
|
|
) -> AddResult<TestResource<T>> {
|
|
Ok(((), resource.0 != "reached"))
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test() {
|
|
let log = Rc::new(slog::Logger::root(slog::Discard, slog::o!()));
|
|
|
|
let cache: Cache<_, Resources, Artifacts> = Cache::new(Inner);
|
|
run(async {
|
|
let reached = cache.add(&log, Rc::new(TestResource("reached", ())), false);
|
|
let a = cache.add(&log, Rc::new(TestResource("a", ())), false);
|
|
let b = cache.add(&log, Rc::new(TestResource("b", ())), false);
|
|
let (reached, a, b) = join!(reached, a, b);
|
|
assert_eq!(((), false), reached.unwrap());
|
|
assert_eq!(((), true), a.unwrap());
|
|
assert_eq!(((), true), b.unwrap());
|
|
})
|
|
}
|
|
}
|