A library for writing host-specific, single-binary configuration management and deployment tools
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
4.6 KiB

  1. use super::{Add, AddResult, AddableResource};
  2. use crate::resources::{FromArtifact, FromResource};
  3. use async_trait::async_trait;
  4. use futures_util::future::{FutureExt, Shared};
  5. use slog::{trace, Logger};
  6. use std::cell::RefCell;
  7. use std::collections::HashMap;
  8. use std::fmt::Debug;
  9. use std::future::Future;
  10. use std::hash::Hash;
  11. use std::pin::Pin;
  12. use std::rc::Rc;
  13. // FIXME: Switch Error to Rc
  14. type ResourceCache<Rs, As> =
  15. HashMap<Rs, Shared<Pin<Box<dyn Future<Output = Result<(As, bool), String>>>>>>;
  16. #[derive(Debug)]
  17. pub struct Cache<I, Rs, As> {
  18. resources: RefCell<ResourceCache<Rs, As>>,
  19. inner: Rc<I>,
  20. }
  21. impl<I, Rs, As> Cache<I, Rs, As> {
  22. pub fn new(inner: I) -> Self {
  23. Self {
  24. resources: RefCell::default(),
  25. inner: Rc::new(inner),
  26. }
  27. }
  28. }
  29. #[async_trait(?Send)]
  30. impl<R: AddableResource + Debug, I, Rs, As> Add<R> for Cache<I, Rs, As>
  31. where
  32. Rs: Hash + Eq + 'static + FromResource<R>,
  33. As: 'static + FromArtifact<R> + Clone,
  34. I: 'static + Add<R>,
  35. {
  36. // FIXME: https://github.com/rust-lang/rust-clippy/issues/6353
  37. #[allow(clippy::await_holding_refcell_ref)]
  38. async fn add(&self, logger: &Rc<Logger>, resource: Rc<R>, force_run: bool) -> AddResult<R> {
  39. let (storable_resource, weak_resource) = Rs::from_resource(&resource);
  40. let mut resources = self.resources.borrow_mut();
  41. let future = if let Some(future) = resources.get(&storable_resource) {
  42. assert!(
  43. !force_run,
  44. "Forcing to run an already-added resource is a logical error"
  45. );
  46. trace!(logger, "Resource already added");
  47. future.clone()
  48. } else {
  49. let inner_weak = Rc::downgrade(&self.inner);
  50. let logger_weak = Rc::downgrade(logger);
  51. let future = (Box::pin(async move {
  52. let inner = inner_weak.upgrade().expect("Dangling!");
  53. let logger = logger_weak.upgrade().expect("Dangling!");
  54. let resource = weak_resource.upgrade().expect("Dangling!");
  55. let result = inner.add(&logger, Rc::clone(&resource), force_run).await;
  56. // Need to convert Box<Error> to String for Clone for Shared
  57. result
  58. .map(|(t, did_run)| (As::from_artifact(t), did_run))
  59. .map_err(|e| e.to_string())
  60. }) as Pin<Box<dyn Future<Output = Result<(As, bool), String>>>>)
  61. .shared();
  62. resources.insert(storable_resource, future.clone());
  63. future
  64. };
  65. drop(resources);
  66. future
  67. .await
  68. .map(|(t, did_run)| (t.into_artifact(), did_run))
  69. .map_err(std::convert::Into::into)
  70. }
  71. }
  72. #[cfg(test)]
  73. mod test {
  74. use super::{Add, AddResult, Cache, Logger};
  75. use crate::async_utils::{join, run};
  76. use crate::resources::{FromArtifact, FromResource, Resource};
  77. use async_trait::async_trait;
  78. use std::fmt::Debug;
  79. use std::rc::{Rc, Weak};
  80. #[derive(Debug, PartialEq, Eq, Hash)]
  81. struct TestResource<T>(&'static str, T);
  82. impl<T> Resource for TestResource<T> {
  83. type Artifact = ();
  84. }
  85. #[derive(Debug, Hash, PartialEq, Eq)]
  86. enum Resources {
  87. A(Rc<TestResource<&'static str>>),
  88. B(Rc<TestResource<()>>),
  89. }
  90. impl FromResource<TestResource<&'static str>> for Resources {
  91. fn from_resource(
  92. inner: &Rc<TestResource<&'static str>>,
  93. ) -> (Self, Weak<TestResource<&'static str>>) {
  94. (Self::A(Rc::clone(&inner)), Rc::downgrade(&inner))
  95. }
  96. }
  97. impl FromResource<TestResource<()>> for Resources {
  98. fn from_resource(inner: &Rc<TestResource<()>>) -> (Self, Weak<TestResource<()>>) {
  99. (Self::B(Rc::clone(&inner)), Rc::downgrade(&inner))
  100. }
  101. }
  102. #[derive(Clone)]
  103. struct Artifacts;
  104. impl<V> FromArtifact<TestResource<V>> for Artifacts {
  105. fn from_artifact(_from: ()) -> Self {
  106. Self
  107. }
  108. #[allow(clippy::unused_unit)]
  109. fn into_artifact(self) -> () {
  110. #[allow(clippy::unused_unit)]
  111. ()
  112. }
  113. }
  114. struct Inner;
  115. #[async_trait(?Send)]
  116. impl<T: Debug + 'static> Add<TestResource<T>> for Inner {
  117. async fn add(
  118. &self,
  119. logger: &Rc<Logger>,
  120. resource: Rc<TestResource<T>>,
  121. force_run: bool,
  122. ) -> AddResult<TestResource<T>> {
  123. Ok(((), resource.0 != "reached"))
  124. }
  125. }
  126. #[test]
  127. fn test() {
  128. let log = Rc::new(slog::Logger::root(slog::Discard, slog::o!()));
  129. let cache: Cache<_, Resources, Artifacts> = Cache::new(Inner);
  130. run(async {
  131. let reached = cache.add(&log, Rc::new(TestResource("reached", ())), false);
  132. let a = cache.add(&log, Rc::new(TestResource("a", ())), false);
  133. let b = cache.add(&log, Rc::new(TestResource("b", ())), false);
  134. let (reached, a, b) = join!(reached, a, b);
  135. assert_eq!(((), false), reached.unwrap());
  136. assert_eq!(((), true), a.unwrap());
  137. assert_eq!(((), true), b.unwrap());
  138. })
  139. }
  140. }