A library for writing host-specific, single-binary configuration management and deployment tools
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
4.6 KiB

use super::{Add, AddResult, AddableResource};
use crate::resources::{FromArtifact, FromResource};
use async_trait::async_trait;
use futures_util::future::{FutureExt, Shared};
use slog::{trace, Logger};
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt::Debug;
use std::future::Future;
use std::hash::Hash;
use std::pin::Pin;
use std::rc::Rc;
// FIXME: Switch Error to Rc
type ResourceCache<Rs, As> =
HashMap<Rs, Shared<Pin<Box<dyn Future<Output = Result<(As, bool), String>>>>>>;
#[derive(Debug)]
pub struct Cache<I, Rs, As> {
resources: RefCell<ResourceCache<Rs, As>>,
inner: Rc<I>,
}
impl<I, Rs, As> Cache<I, Rs, As> {
pub fn new(inner: I) -> Self {
Self {
resources: RefCell::default(),
inner: Rc::new(inner),
}
}
}
#[async_trait(?Send)]
impl<R: AddableResource + Debug, I, Rs, As> Add<R> for Cache<I, Rs, As>
where
Rs: Hash + Eq + 'static + FromResource<R>,
As: 'static + FromArtifact<R> + Clone,
I: 'static + Add<R>,
{
// FIXME: https://github.com/rust-lang/rust-clippy/issues/6353
#[allow(clippy::await_holding_refcell_ref)]
async fn add(&self, logger: &Rc<Logger>, resource: Rc<R>, force_run: bool) -> AddResult<R> {
let (storable_resource, weak_resource) = Rs::from_resource(&resource);
let mut resources = self.resources.borrow_mut();
let future = if let Some(future) = resources.get(&storable_resource) {
assert!(
!force_run,
"Forcing to run an already-added resource is a logical error"
);
trace!(logger, "Resource already added");
future.clone()
} else {
let inner_weak = Rc::downgrade(&self.inner);
let logger_weak = Rc::downgrade(logger);
let future = (Box::pin(async move {
let inner = inner_weak.upgrade().expect("Dangling!");
let logger = logger_weak.upgrade().expect("Dangling!");
let resource = weak_resource.upgrade().expect("Dangling!");
let result = inner.add(&logger, Rc::clone(&resource), force_run).await;
// Need to convert Box<Error> to String for Clone for Shared
result
.map(|(t, did_run)| (As::from_artifact(t), did_run))
.map_err(|e| e.to_string())
}) as Pin<Box<dyn Future<Output = Result<(As, bool), String>>>>)
.shared();
resources.insert(storable_resource, future.clone());
future
};
drop(resources);
future
.await
.map(|(t, did_run)| (t.into_artifact(), did_run))
.map_err(std::convert::Into::into)
}
}
#[cfg(test)]
mod test {
use super::{Add, AddResult, Cache, Logger};
use crate::async_utils::{join, run};
use crate::resources::{FromArtifact, FromResource, Resource};
use async_trait::async_trait;
use std::fmt::Debug;
use std::rc::{Rc, Weak};
#[derive(Debug, PartialEq, Eq, Hash)]
struct TestResource<T>(&'static str, T);
impl<T> Resource for TestResource<T> {
type Artifact = ();
}
#[derive(Debug, Hash, PartialEq, Eq)]
enum Resources {
A(Rc<TestResource<&'static str>>),
B(Rc<TestResource<()>>),
}
impl FromResource<TestResource<&'static str>> for Resources {
fn from_resource(
inner: &Rc<TestResource<&'static str>>,
) -> (Self, Weak<TestResource<&'static str>>) {
(Self::A(Rc::clone(&inner)), Rc::downgrade(&inner))
}
}
impl FromResource<TestResource<()>> for Resources {
fn from_resource(inner: &Rc<TestResource<()>>) -> (Self, Weak<TestResource<()>>) {
(Self::B(Rc::clone(&inner)), Rc::downgrade(&inner))
}
}
#[derive(Clone)]
struct Artifacts;
impl<V> FromArtifact<TestResource<V>> for Artifacts {
fn from_artifact(_from: ()) -> Self {
Self
}
#[allow(clippy::unused_unit)]
fn into_artifact(self) -> () {
#[allow(clippy::unused_unit)]
()
}
}
struct Inner;
#[async_trait(?Send)]
impl<T: Debug + 'static> Add<TestResource<T>> for Inner {
async fn add(
&self,
logger: &Rc<Logger>,
resource: Rc<TestResource<T>>,
force_run: bool,
) -> AddResult<TestResource<T>> {
Ok(((), resource.0 != "reached"))
}
}
#[test]
fn test() {
let log = Rc::new(slog::Logger::root(slog::Discard, slog::o!()));
let cache: Cache<_, Resources, Artifacts> = Cache::new(Inner);
run(async {
let reached = cache.add(&log, Rc::new(TestResource("reached", ())), false);
let a = cache.add(&log, Rc::new(TestResource("a", ())), false);
let b = cache.add(&log, Rc::new(TestResource("b", ())), false);
let (reached, a, b) = join!(reached, a, b);
assert_eq!(((), false), reached.unwrap());
assert_eq!(((), true), a.unwrap());
assert_eq!(((), true), b.unwrap());
})
}
}