devela/work/future/coroutine/
coro.rs

1// devela::work::future::coroutine
2//
3//! A minimal single-threaded coroutine implementation.
4//!
5//! This code demonstrates a basic cooperative multitasking system where tasks
6//! can yield control back to the scheduler and be resumed later.
7//!
8//! This is the fundamental concept behind coroutines.
9//!
10//! This is based on:
11//! - <https://blog.aloni.org/posts/a-stack-less-rust-coroutine-100-loc/>
12//! - <https://www.reddit.com/r/rust/comments/etqwhx/a_stackless_rust_coroutine_library_under_100_loc/>
13//
14
15use crate::{serr, sok, Debug, Future, OptRes, Pin, TaskContext, TaskPoll};
16#[cfg(feature = "alloc")]
17use crate::{Box, TaskWaker, VecDeque};
18
19/* coroutine */
20
21/// Represents a single thread stackless coroutine.
22///
23/// It has a private status that can be either running or halted.
24#[derive(Clone, Copy, Debug)]
25pub struct Coro<T, E> {
26    status: CoroStatus,
27    result: OptRes<T, E>,
28}
29
30// Private coroutine status.
31#[derive(Clone, Copy, Debug)]
32enum CoroStatus {
33    Halted,
34    Running,
35}
36
37impl<T, E> Coro<T, E> {
38    // Returns a new coroutine.
39    #[allow(unused)]
40    const fn new() -> Self {
41        Coro { status: CoroStatus::Running, result: None }
42    }
43
44    /// Yields an [`Ok`] `value` and returns an awaitable CoroYield.
45    pub fn yield_ok(&mut self, value: T) -> CoroYield<'_, T, E> {
46        self.result = sok(value);
47        CoroYield { cor: self }
48    }
49
50    /// Yields an [`Err`] and returns an awaitable future.
51    pub fn yield_err(&mut self, error: E) -> CoroYield<'_, T, E> {
52        self.result = serr(error);
53        CoroYield { cor: self }
54    }
55}
56
57/* yielder */
58
59/// A future that alternates between [`Ready`][TaskPoll::Ready] and
60/// [`Pending`][TaskPoll::Pending] status each time it's polled.
61///
62/// This allows the coroutine to yield control back and be resumed later.
63pub struct CoroYield<'a, T, E> {
64    cor: &'a mut Coro<T, E>,
65}
66
67impl<T, E> Future for CoroYield<'_, T, E> {
68    type Output = OptRes<T, E>;
69
70    fn poll(mut self: Pin<&mut Self>, _cx: &mut TaskContext) -> TaskPoll<OptRes<T, E>> {
71        match self.cor.status {
72            CoroStatus::Halted => {
73                self.cor.status = CoroStatus::Running;
74                if let Some(result) = self.cor.result.take() {
75                    match result {
76                        Err(error) => TaskPoll::Ready(serr(error)),
77                        Ok(value) => TaskPoll::Ready(sok(value)),
78                    }
79                } else {
80                    unreachable!();
81                }
82            }
83            CoroStatus::Running => {
84                self.cor.status = CoroStatus::Halted;
85                TaskPoll::Pending
86            }
87        }
88    }
89}
90
91/* runner */
92
93/// A managed dynamic collection of single-thread [`Coro`]utines.
94///
95/// It maintains a queue of coroutines in the stack, and runs them in a loop until
96/// they are all complete.
97///
98/// When a coroutine is polled and returns [`TaskPoll::Pending`], it is put back
99/// into the queue to be run again later. If it returns [`TaskPoll::Ready`]
100/// it is considered complete and is not put back into the queue.
101///
102/// # Examples
103/// ```
104#[doc = include_str!("../../../../examples/work/coro_run.rs")]
105/// ```
106/// It outputs:
107/// ```text
108/// Running
109/// > instance 1 NEW
110/// > instance 2 NEW
111/// > instance 3 NEW
112/// > instance 4 NEW
113///   instance 1 A.0 Ok('a'))
114///   instance 2 A.0 Ok('a'))
115///   instance 3 A.0 Ok('a'))
116///   instance 1 B Ok('b')
117///   instance 2 B Ok('b')
118///   instance 3 B Ok('b')
119///   instance 1 A.1 Ok('a'))
120///   instance 2 A.1 Ok('a'))
121///   instance 3 A.1 Ok('a'))
122///   instance 4 BYE!
123///   instance 1 B Ok('b')
124///   instance 2 B Ok('b')
125///   instance 3 B Ok('b')
126///   instance 1 A.2 Ok('a'))
127///   instance 2 A.2 Ok('a'))
128///   instance 3 A.2 Ok('a'))
129///   instance 1 B Ok('b')
130///   instance 2 B Ok('b')
131///   instance 3 B Ok('b')
132///   instance 1 A.3 Ok('a'))
133///   instance 2 A.3 Ok('a'))
134///   instance 3 A.3 Ok('a'))
135///   instance 1 B Ok('b')
136///   instance 2 B Ok('b')
137///   instance 3 B Ok('b')
138/// Done
139/// ```
140#[derive(Default)]
141#[cfg(feature = "alloc")]
142#[cfg_attr(feature = "nightly_doc", doc(cfg(feature = "alloc")))]
143pub struct CoroRun<T, E> {
144    #[allow(clippy::type_complexity)]
145    coros: VecDeque<Pin<Box<dyn Future<Output = OptRes<T, E>>>>>,
146}
147
148#[cfg(feature = "alloc")]
149impl<T, E: 'static + Debug> CoroRun<T, E> {
150    /// Returns a new empty runner.
151    pub fn new() -> Self {
152        CoroRun { coros: VecDeque::new() }
153    }
154
155    /// Adds a closure to the runner.
156    pub fn push<C, F>(&mut self, closure: C)
157    where
158        C: FnOnce(Coro<T, E>) -> F,
159        F: Future<Output = OptRes<T, E>> + 'static,
160    {
161        self.coros.push_back(Box::pin(closure(Coro::new())));
162    }
163
164    /// Runs all the coroutines to completion.
165    pub fn run(&mut self) {
166        let waker = TaskWaker::noop();
167        let mut context = TaskContext::from_waker(waker);
168
169        while let Some(mut cor) = self.coros.pop_front() {
170            let polled = cor.as_mut().poll(&mut context);
171            // println!("  coroutine polled:");
172
173            match polled {
174                TaskPoll::Pending => {
175                    // println!("  - pending, push back");
176                    self.coros.push_back(cor);
177                }
178                TaskPoll::Ready(_result) => {
179                    // println!("  - READY");
180                    // if let Some(Err(err)) = result {
181                    //     // eprintln!("    Error in coroutine: {:?}", err);
182                    // }
183                }
184            }
185        }
186    }
187}