devela/work/async/coroutine/
coro.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// devela::work::async::coroutine
//
//! A minimal single-threaded coroutine implementation.
//!
//! This code demonstrates a basic cooperative multitasking system where tasks
//! can yield control back to the scheduler and be resumed later.
//!
//! This is the fundamental concept behind coroutines.
//!
//! This is based on:
//! - <https://blog.aloni.org/posts/a-stack-less-rust-coroutine-100-loc/>
//! - <https://www.reddit.com/r/rust/comments/etqwhx/a_stackless_rust_coroutine_library_under_100_loc/>
//

use crate::{serr, sok, Debug, Future, OptRes, Pin, TaskContext, TaskPoll};
#[cfg(feature = "alloc")]
use crate::{task_waker_noop, Box, VecDeque};

/* coroutine */

/// Represents a single thread stackless coroutine.
///
/// It has a private status that can be either running or halted.
#[derive(Clone, Copy, Debug)]
pub struct Coro<T, E> {
    status: CoroStatus,
    result: OptRes<T, E>,
}

// Private coroutine status.
#[derive(Clone, Copy, Debug)]
enum CoroStatus {
    Halted,
    Running,
}

impl<T, E> Coro<T, E> {
    // Returns a new coroutine.
    #[allow(unused)]
    const fn new() -> Self {
        Coro { status: CoroStatus::Running, result: None }
    }

    /// Yields an [`Ok`] `value` and returns an awaitable CoroYield.
    pub fn yield_ok(&mut self, value: T) -> CoroYield<'_, T, E> {
        self.result = sok(value);
        CoroYield { cor: self }
    }

    /// Yields an [`Err`] and returns an awaitable future.
    pub fn yield_err(&mut self, error: E) -> CoroYield<'_, T, E> {
        self.result = serr(error);
        CoroYield { cor: self }
    }
}

/* yielder */

/// A future that alternates between [`Ready`][TaskPoll::Ready] and
/// [`Pending`][TaskPoll::Pending] status each time it's polled.
///
/// This allows the coroutine to yield control back and be resumed later.
pub struct CoroYield<'a, T, E> {
    cor: &'a mut Coro<T, E>,
}

impl<T, E> Future for CoroYield<'_, T, E> {
    type Output = OptRes<T, E>;

    fn poll(mut self: Pin<&mut Self>, _cx: &mut TaskContext) -> TaskPoll<OptRes<T, E>> {
        match self.cor.status {
            CoroStatus::Halted => {
                self.cor.status = CoroStatus::Running;
                if let Some(result) = self.cor.result.take() {
                    match result {
                        Err(error) => TaskPoll::Ready(serr(error)),
                        Ok(value) => TaskPoll::Ready(sok(value)),
                    }
                } else {
                    unreachable!();
                }
            }
            CoroStatus::Running => {
                self.cor.status = CoroStatus::Halted;
                TaskPoll::Pending
            }
        }
    }
}

/* runner */

/// A managed dynamic collection of single-thread [`Coro`]utines.
///
/// It maintains a queue of coroutines in the stack, and runs them in a loop until
/// they are all complete.
///
/// When a coroutine is polled and returns [`TaskPoll::Pending`], it is put back
/// into the queue to be run again later. If it returns [`TaskPoll::Ready`]
/// it is considered complete and is not put back into the queue.
///
/// # Examples
/// ```
#[doc = include_str!("../../../../examples/work/coro_run.rs")]
/// ```
/// It outputs:
/// ```text
/// Running
/// > instance 1 NEW
/// > instance 2 NEW
/// > instance 3 NEW
/// > instance 4 NEW
///   instance 1 A.0 Ok('a'))
///   instance 2 A.0 Ok('a'))
///   instance 3 A.0 Ok('a'))
///   instance 1 B Ok('b')
///   instance 2 B Ok('b')
///   instance 3 B Ok('b')
///   instance 1 A.1 Ok('a'))
///   instance 2 A.1 Ok('a'))
///   instance 3 A.1 Ok('a'))
///   instance 4 BYE!
///   instance 1 B Ok('b')
///   instance 2 B Ok('b')
///   instance 3 B Ok('b')
///   instance 1 A.2 Ok('a'))
///   instance 2 A.2 Ok('a'))
///   instance 3 A.2 Ok('a'))
///   instance 1 B Ok('b')
///   instance 2 B Ok('b')
///   instance 3 B Ok('b')
///   instance 1 A.3 Ok('a'))
///   instance 2 A.3 Ok('a'))
///   instance 3 A.3 Ok('a'))
///   instance 1 B Ok('b')
///   instance 2 B Ok('b')
///   instance 3 B Ok('b')
/// Done
/// ```
#[derive(Default)]
#[cfg(feature = "alloc")]
#[cfg_attr(feature = "nightly_doc", doc(cfg(feature = "alloc")))]
pub struct CoroRun<T, E> {
    #[allow(clippy::type_complexity)]
    coros: VecDeque<Pin<Box<dyn Future<Output = OptRes<T, E>>>>>,
}

#[cfg(feature = "alloc")]
impl<T, E: 'static + Debug> CoroRun<T, E> {
    /// Returns a new empty runner.
    pub fn new() -> Self {
        CoroRun { coros: VecDeque::new() }
    }

    /// Adds a closure to the runner.
    pub fn push<C, F>(&mut self, closure: C)
    where
        C: FnOnce(Coro<T, E>) -> F,
        F: Future<Output = OptRes<T, E>> + 'static,
    {
        self.coros.push_back(Box::pin(closure(Coro::new())));
    }

    /// Runs all the coroutines to completion.
    pub fn run(&mut self) {
        let waker = task_waker_noop();
        let mut context = TaskContext::from_waker(&waker);

        while let Some(mut cor) = self.coros.pop_front() {
            let polled = cor.as_mut().poll(&mut context);
            // println!("  coroutine polled:");

            match polled {
                TaskPoll::Pending => {
                    // println!("  - pending, push back");
                    self.coros.push_back(cor);
                }
                TaskPoll::Ready(_result) => {
                    // println!("  - READY");
                    // if let Some(Err(err)) = result {
                    //     // eprintln!("    Error in coroutine: {:?}", err);
                    // }
                }
            }
        }
    }
}