A library for writing host-specific, single-binary configuration management and deployment tools
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

92 lines
2.0 KiB

  1. use std::{
  2. future::Future,
  3. pin::Pin,
  4. sync::{Arc, Mutex},
  5. task::{Context, Poll, Waker},
  6. thread,
  7. time::Duration,
  8. };
  9. use tokio::runtime::Builder;
  10. pub use async_trait::async_trait;
  11. pub fn run<F: Future>(future: F) -> F::Output {
  12. Builder::new_current_thread()
  13. .build()
  14. .unwrap()
  15. .block_on(future)
  16. }
  17. pub use tokio::join;
  18. pub use tokio::try_join;
  19. #[derive(Debug)]
  20. pub struct TimerFuture {
  21. state: Arc<Mutex<State>>,
  22. }
  23. #[derive(Debug)]
  24. enum State {
  25. NotStarted(Duration),
  26. Running(Waker),
  27. Completed,
  28. }
  29. impl Future for TimerFuture {
  30. type Output = ();
  31. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  32. let mut state = self.state.lock().unwrap();
  33. if let State::Completed = *state {
  34. return Poll::Ready(());
  35. }
  36. if let State::NotStarted(duration) = *state {
  37. let thread_state = self.state.clone();
  38. thread::spawn(move || {
  39. thread::sleep(duration);
  40. let mut state = thread_state.lock().unwrap();
  41. let waker = if let State::Running(waker) = &*state {
  42. Some(waker.clone())
  43. } else {
  44. None
  45. };
  46. *state = State::Completed;
  47. if let Some(w) = waker {
  48. w.wake()
  49. }
  50. });
  51. }
  52. *state = State::Running(cx.waker().clone());
  53. Poll::Pending
  54. }
  55. }
  56. pub fn sleep(duration: Duration) -> impl Future<Output = ()> {
  57. TimerFuture {
  58. state: Arc::new(Mutex::new(State::NotStarted(duration))),
  59. }
  60. }
  61. #[cfg(test)]
  62. mod test {
  63. use crate::async_utils::{run, sleep};
  64. use futures::future::FutureExt;
  65. use std::time::{Duration, Instant};
  66. #[test]
  67. fn test_sleep() {
  68. run(async {
  69. let start = Instant::now();
  70. let sleep = sleep(Duration::from_millis(100)).fuse();
  71. let ok = async {}.fuse();
  72. futures::pin_mut!(sleep, ok);
  73. loop {
  74. futures::select! {
  75. _ = sleep => {},
  76. _ = ok => assert!((Instant::now() - start).as_millis() < 100),
  77. complete => break,
  78. }
  79. }
  80. })
  81. }
  82. }