A library for writing host-specific, single-binary configuration management and deployment tools
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

91 lines
1.9 KiB

  1. use std::{
  2. future::Future,
  3. pin::Pin,
  4. sync::{Arc, Mutex},
  5. task::{Context, Poll, Waker},
  6. thread,
  7. time::Duration,
  8. };
  9. use tokio::runtime::Builder;
  10. pub use async_trait::async_trait;
  11. pub fn run<F: Future>(future: F) -> F::Output {
  12. Builder::new_current_thread()
  13. .build()
  14. .unwrap()
  15. .block_on(future)
  16. }
  17. pub use tokio::try_join;
  18. #[derive(Debug)]
  19. pub struct TimerFuture {
  20. state: Arc<Mutex<State>>,
  21. }
  22. #[derive(Debug)]
  23. enum State {
  24. NotStarted(Duration),
  25. Running(Waker),
  26. Completed,
  27. }
  28. impl Future for TimerFuture {
  29. type Output = ();
  30. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  31. let mut state = self.state.lock().unwrap();
  32. if let State::Completed = *state {
  33. return Poll::Ready(());
  34. }
  35. if let State::NotStarted(duration) = *state {
  36. let thread_state = self.state.clone();
  37. thread::spawn(move || {
  38. thread::sleep(duration);
  39. let mut state = thread_state.lock().unwrap();
  40. let waker = if let State::Running(waker) = &*state {
  41. Some(waker.clone())
  42. } else {
  43. None
  44. };
  45. *state = State::Completed;
  46. if let Some(w) = waker {
  47. w.wake()
  48. }
  49. });
  50. }
  51. *state = State::Running(cx.waker().clone());
  52. Poll::Pending
  53. }
  54. }
  55. pub fn sleep(duration: Duration) -> impl Future<Output = ()> {
  56. TimerFuture {
  57. state: Arc::new(Mutex::new(State::NotStarted(duration))),
  58. }
  59. }
  60. #[cfg(test)]
  61. mod test {
  62. use crate::async_utils::{run, sleep};
  63. use futures::future::FutureExt;
  64. use std::time::{Duration, Instant};
  65. #[test]
  66. fn test_sleep() {
  67. run(async {
  68. let start = Instant::now();
  69. let sleep = sleep(Duration::from_millis(100)).fuse();
  70. let ok = async {}.fuse();
  71. futures::pin_mut!(sleep, ok);
  72. loop {
  73. futures::select! {
  74. _ = sleep => {},
  75. _ = ok => assert!((Instant::now() - start).as_millis() < 100),
  76. complete => break,
  77. }
  78. }
  79. })
  80. }
  81. }