A library for writing host-specific, single-binary configuration management and deployment tools
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

93 lines
2.0 KiB

  1. use std::{
  2. future::Future,
  3. pin::Pin,
  4. sync::{Arc, Mutex},
  5. task::{Context, Poll, Waker},
  6. thread,
  7. time::Duration,
  8. };
  9. use tokio::runtime::Builder;
  10. pub use async_trait::async_trait;
  11. pub fn run<F: Future>(future: F) -> F::Output {
  12. Builder::new_current_thread()
  13. .enable_io()
  14. .build()
  15. .unwrap()
  16. .block_on(future)
  17. }
  18. pub use tokio::join;
  19. pub use tokio::try_join;
  20. #[derive(Debug)]
  21. pub struct TimerFuture {
  22. state: Arc<Mutex<State>>,
  23. }
  24. #[derive(Debug)]
  25. enum State {
  26. NotStarted(Duration),
  27. Running(Waker),
  28. Completed,
  29. }
  30. impl Future for TimerFuture {
  31. type Output = ();
  32. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  33. let mut state = self.state.lock().unwrap();
  34. if let State::Completed = *state {
  35. return Poll::Ready(());
  36. }
  37. if let State::NotStarted(duration) = *state {
  38. let thread_state = self.state.clone();
  39. thread::spawn(move || {
  40. thread::sleep(duration);
  41. let mut state = thread_state.lock().unwrap();
  42. let waker = if let State::Running(waker) = &*state {
  43. Some(waker.clone())
  44. } else {
  45. None
  46. };
  47. *state = State::Completed;
  48. if let Some(w) = waker {
  49. w.wake()
  50. }
  51. });
  52. }
  53. *state = State::Running(cx.waker().clone());
  54. Poll::Pending
  55. }
  56. }
  57. pub fn sleep(duration: Duration) -> impl Future<Output = ()> {
  58. TimerFuture {
  59. state: Arc::new(Mutex::new(State::NotStarted(duration))),
  60. }
  61. }
  62. #[cfg(test)]
  63. mod test {
  64. use crate::async_utils::{run, sleep};
  65. use futures_util::future::FutureExt;
  66. use std::time::{Duration, Instant};
  67. #[test]
  68. fn test_sleep() {
  69. run(async {
  70. let start = Instant::now();
  71. let sleep = sleep(Duration::from_millis(100)).fuse();
  72. let ok = async {}.fuse();
  73. futures_util::pin_mut!(sleep, ok);
  74. loop {
  75. futures_util::select! {
  76. _ = sleep => {},
  77. _ = ok => assert!((Instant::now() - start).as_millis() < 100),
  78. complete => break,
  79. }
  80. }
  81. })
  82. }
  83. }