A library for writing host-specific, single-binary configuration management and deployment tools
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

87 lines
1.9 KiB

2 years ago
2 years ago
  1. use std::{
  2. future::Future,
  3. pin::Pin,
  4. sync::{Arc, Mutex},
  5. task::{Context, Poll, Waker},
  6. thread,
  7. time::Duration,
  8. };
  9. use tokio::runtime::Builder;
  10. pub use async_trait::async_trait;
  11. pub fn run<F: Future>(future: F) -> F::Output {
  12. Builder::new_current_thread()
  13. .enable_io()
  14. .build()
  15. .expect("Error setting up async runtime")
  16. .block_on(future)
  17. }
  18. pub use tokio::join;
  19. pub use tokio::try_join;
  20. #[derive(Debug)]
  21. pub struct TimerFuture {
  22. state: Arc<Mutex<State>>,
  23. }
  24. #[derive(Debug)]
  25. enum State {
  26. NotStarted(Duration),
  27. Running(Waker),
  28. Completed,
  29. }
  30. impl Future for TimerFuture {
  31. type Output = ();
  32. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  33. let mut state = self.state.lock().unwrap();
  34. match *state {
  35. State::Completed => return Poll::Ready(()),
  36. State::NotStarted(duration) => {
  37. let thread_state = self.state.clone();
  38. thread::spawn(move || {
  39. thread::sleep(duration);
  40. let mut state = thread_state.lock().unwrap();
  41. if let State::Running(waker) = std::mem::replace(&mut *state, State::Completed) {
  42. waker.wake();
  43. };
  44. });
  45. }
  46. State::Running(_) => {}
  47. };
  48. *state = State::Running(cx.waker().clone());
  49. Poll::Pending
  50. }
  51. }
  52. pub fn sleep(duration: Duration) -> impl Future<Output = ()> {
  53. TimerFuture {
  54. state: Arc::new(Mutex::new(State::NotStarted(duration))),
  55. }
  56. }
  57. #[cfg(test)]
  58. mod test {
  59. use crate::async_utils::{run, sleep};
  60. use futures_util::future::FutureExt;
  61. use std::time::{Duration, Instant};
  62. #[test]
  63. fn test_sleep() {
  64. run(async {
  65. let start = Instant::now();
  66. let sleep = sleep(Duration::from_millis(100)).fuse();
  67. let ok = async {}.fuse();
  68. futures_util::pin_mut!(sleep, ok);
  69. loop {
  70. futures_util::select! {
  71. _ = sleep => {},
  72. _ = ok => assert!(start.elapsed().as_millis() < 100),
  73. complete => break,
  74. }
  75. }
  76. });
  77. }
  78. }