devela/work/async/block.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
// devela::work::async::block_on
//
// Original source code by Joshua Barretto, licensed as MIT OR Apache-2.0
// https://crates.io/crates/pollster/0.3.0
//
// MODIFICATIONS:
// - removed the macro.
// - removed all unsafe.
// - renamed the function.
// - misc. refactorings.
//
// MAYBE IMPROVE:
// - [Lock-free and alloc-free implementation](https://github.com/zesterer/pollster/pull/9)
// - [add benchmarks](https://github.com/zesterer/pollster/pull/20)
use crate::{pin, Arc, Condvar, Future, Mutex, TaskContext, TaskPoll, TaskWake, TaskWaker};
/// Blocks the thread until the `future` is ready.
///
/// See also the [`ExtFuture`][super::ExtFuture] trait.
///
/// # Examples
/// ```
/// let future = async {};
/// let result = devela::work::future_block(future);
/// ```
pub fn future_block<F: Future>(mut future: F) -> F::Output {
// Pin the future so that it can be polled.
let mut future = pin!(future);
// Signal used to wake up the thread for polling as the future moves to completion. We need to
// use an `Arc` because, although the lifetime of `future` is limited to this function, the
// underlying IO abstraction might keep the signal alive for far longer. `Arc` is a thread-safe
// way to allow this to happen.
// MAYBE: Investigate ways to reuse this `Arc<Signal>`... perhaps via a `static`?
let signal = Arc::new(Signal::new());
// Create a context that will be passed to the future.
let waker = TaskWaker::from(Arc::clone(&signal));
let mut context = TaskContext::from_waker(&waker);
// Poll the future to completion
loop {
match future.as_mut().poll(&mut context) {
TaskPoll::Pending => signal.wait(),
TaskPoll::Ready(item) => break item,
}
}
}
struct Signal {
state: Mutex<SignalState>,
cond: Condvar,
}
enum SignalState {
Empty,
Waiting,
Notified,
}
impl TaskWake for Signal {
fn wake(self: Arc<Self>) {
self.notify();
}
}
impl Signal {
fn new() -> Self {
Self {
state: Mutex::new(SignalState::Empty),
cond: Condvar::new(),
}
}
fn wait(&self) {
let mut state = self.state.lock().unwrap();
match *state {
SignalState::Notified => {
// notify() was called before we got here,
// consume it here without waiting and return immediately.
*state = SignalState::Empty;
}
// This should not be possible because our signal is created within a function and
// never handed out to any other threads. If this is the case, we have a serious
// problem so we panic immediately to avoid anything more problematic happening.
SignalState::Waiting => {
unreachable!("Multiple threads waiting on the same signal: Open a bug report!");
}
SignalState::Empty => {
// Nothing has happened yet, and we're the only thread waiting (as should be the
// case!). Set the state accordingly and begin polling the condvar in a loop until
// it's no longer telling us to wait. The loop prevents incorrect spurious wakeups.
*state = SignalState::Waiting;
while let SignalState::Waiting = *state {
state = self.cond.wait(state).unwrap();
}
}
}
}
fn notify(&self) {
let mut state = self.state.lock().unwrap();
match *state {
// The signal was already notified, no need to do anything because the thread will be
// waking up anyway
SignalState::Notified => {}
// The signal wasn't notified but a thread isnt waiting on it, so we can avoid doing
// unnecessary work by skipping the condvar and leaving behind a message telling the
// thread that a notification has already occurred should it come along in the future.
SignalState::Empty => *state = SignalState::Notified,
// The signal wasn't notified and there's a waiting thread. Reset the signal so it can
// be waited on again and wake up the thread. Because there should only be a single
// thread waiting, `notify_all` would also be valid.
SignalState::Waiting => {
*state = SignalState::Empty;
self.cond.notify_one();
}
}
}
}