devela/work/future/coroutine/coro.rs
1// devela::work::future::coroutine
2//
3//! A minimal single-threaded coroutine implementation.
4//!
5//! This code demonstrates a basic cooperative multitasking system where tasks
6//! can yield control back to the scheduler and be resumed later.
7//!
8//! This is the fundamental concept behind coroutines.
9//!
10//! This is based on:
11//! - <https://blog.aloni.org/posts/a-stack-less-rust-coroutine-100-loc/>
12//! - <https://www.reddit.com/r/rust/comments/etqwhx/a_stackless_rust_coroutine_library_under_100_loc/>
13//
14
15use crate::{serr, sok, Debug, Future, OptRes, Pin, TaskContext, TaskPoll};
16#[cfg(feature = "alloc")]
17use crate::{Box, TaskWaker, VecDeque};
18
19/* coroutine */
20
21/// Represents a single thread stackless coroutine.
22///
23/// It has a private status that can be either running or halted.
24#[derive(Clone, Copy, Debug)]
25pub struct Coro<T, E> {
26 status: CoroStatus,
27 result: OptRes<T, E>,
28}
29
30// Private coroutine status.
31#[derive(Clone, Copy, Debug)]
32enum CoroStatus {
33 Halted,
34 Running,
35}
36
37impl<T, E> Coro<T, E> {
38 // Returns a new coroutine.
39 #[allow(unused)]
40 const fn new() -> Self {
41 Coro { status: CoroStatus::Running, result: None }
42 }
43
44 /// Yields an [`Ok`] `value` and returns an awaitable CoroYield.
45 pub fn yield_ok(&mut self, value: T) -> CoroYield<'_, T, E> {
46 self.result = sok(value);
47 CoroYield { cor: self }
48 }
49
50 /// Yields an [`Err`] and returns an awaitable future.
51 pub fn yield_err(&mut self, error: E) -> CoroYield<'_, T, E> {
52 self.result = serr(error);
53 CoroYield { cor: self }
54 }
55}
56
57/* yielder */
58
59/// A future that alternates between [`Ready`][TaskPoll::Ready] and
60/// [`Pending`][TaskPoll::Pending] status each time it's polled.
61///
62/// This allows the coroutine to yield control back and be resumed later.
63pub struct CoroYield<'a, T, E> {
64 cor: &'a mut Coro<T, E>,
65}
66
67impl<T, E> Future for CoroYield<'_, T, E> {
68 type Output = OptRes<T, E>;
69
70 fn poll(mut self: Pin<&mut Self>, _cx: &mut TaskContext) -> TaskPoll<OptRes<T, E>> {
71 match self.cor.status {
72 CoroStatus::Halted => {
73 self.cor.status = CoroStatus::Running;
74 if let Some(result) = self.cor.result.take() {
75 match result {
76 Err(error) => TaskPoll::Ready(serr(error)),
77 Ok(value) => TaskPoll::Ready(sok(value)),
78 }
79 } else {
80 unreachable!();
81 }
82 }
83 CoroStatus::Running => {
84 self.cor.status = CoroStatus::Halted;
85 TaskPoll::Pending
86 }
87 }
88 }
89}
90
91/* runner */
92
93/// A managed dynamic collection of single-thread [`Coro`]utines.
94///
95/// It maintains a queue of coroutines in the stack, and runs them in a loop until
96/// they are all complete.
97///
98/// When a coroutine is polled and returns [`TaskPoll::Pending`], it is put back
99/// into the queue to be run again later. If it returns [`TaskPoll::Ready`]
100/// it is considered complete and is not put back into the queue.
101///
102/// # Examples
103/// ```
104#[doc = include_str!("../../../../examples/work/coro_run.rs")]
105/// ```
106/// It outputs:
107/// ```text
108/// Running
109/// > instance 1 NEW
110/// > instance 2 NEW
111/// > instance 3 NEW
112/// > instance 4 NEW
113/// instance 1 A.0 Ok('a'))
114/// instance 2 A.0 Ok('a'))
115/// instance 3 A.0 Ok('a'))
116/// instance 1 B Ok('b')
117/// instance 2 B Ok('b')
118/// instance 3 B Ok('b')
119/// instance 1 A.1 Ok('a'))
120/// instance 2 A.1 Ok('a'))
121/// instance 3 A.1 Ok('a'))
122/// instance 4 BYE!
123/// instance 1 B Ok('b')
124/// instance 2 B Ok('b')
125/// instance 3 B Ok('b')
126/// instance 1 A.2 Ok('a'))
127/// instance 2 A.2 Ok('a'))
128/// instance 3 A.2 Ok('a'))
129/// instance 1 B Ok('b')
130/// instance 2 B Ok('b')
131/// instance 3 B Ok('b')
132/// instance 1 A.3 Ok('a'))
133/// instance 2 A.3 Ok('a'))
134/// instance 3 A.3 Ok('a'))
135/// instance 1 B Ok('b')
136/// instance 2 B Ok('b')
137/// instance 3 B Ok('b')
138/// Done
139/// ```
140#[derive(Default)]
141#[cfg(feature = "alloc")]
142#[cfg_attr(feature = "nightly_doc", doc(cfg(feature = "alloc")))]
143pub struct CoroRun<T, E> {
144 #[allow(clippy::type_complexity)]
145 coros: VecDeque<Pin<Box<dyn Future<Output = OptRes<T, E>>>>>,
146}
147
148#[cfg(feature = "alloc")]
149impl<T, E: 'static + Debug> CoroRun<T, E> {
150 /// Returns a new empty runner.
151 pub fn new() -> Self {
152 CoroRun { coros: VecDeque::new() }
153 }
154
155 /// Adds a closure to the runner.
156 pub fn push<C, F>(&mut self, closure: C)
157 where
158 C: FnOnce(Coro<T, E>) -> F,
159 F: Future<Output = OptRes<T, E>> + 'static,
160 {
161 self.coros.push_back(Box::pin(closure(Coro::new())));
162 }
163
164 /// Runs all the coroutines to completion.
165 pub fn run(&mut self) {
166 let waker = TaskWaker::noop();
167 let mut context = TaskContext::from_waker(waker);
168
169 while let Some(mut cor) = self.coros.pop_front() {
170 let polled = cor.as_mut().poll(&mut context);
171 // println!(" coroutine polled:");
172
173 match polled {
174 TaskPoll::Pending => {
175 // println!(" - pending, push back");
176 self.coros.push_back(cor);
177 }
178 TaskPoll::Ready(_result) => {
179 // println!(" - READY");
180 // if let Some(Err(err)) = result {
181 // // eprintln!(" Error in coroutine: {:?}", err);
182 // }
183 }
184 }
185 }
186 }
187}