ОРы: Знает как работает трейт Future. Знает, как компилятор преобразует асинхронный код. Понимает, зачем нужен Pinning. Умеет использовать трейт-объекты Box\. Понимает, как асинхронный рантайм выполняет задачи. Может написать свою реализацию Future. Пример идеи фьючерсов и полинга на примере общения/пинга менеджера ## Асинхронность под капотом io - epoll + timeout time / nothing - futex + timeout ## Трейт Future ## Start В прошлом уроке мы узнали, как писать асинхронный код в rust. Но как же он работает под капотом? Выполнение асинхронного кода происходит по запросу, выглядит примерно так: Происходит запрос к Future, может ли он сейчас выполнить работу и выдать результат. Также Future получает референс на контекст, который ## Концепция асинхронности в Rust В rust асинхронные функции выполняются лениво. При вызове асинхронной функции они выполняют различные вычисления, до того, пока не произойдет io-bound задачи, на которой они передадут выполнение исполнителю, а выполнение функции продолжится когда она вызовет waker Если функция не может вернуть окончательное значение, При этом вызов waker означает, что значение может быть готово В основе асинхронности в rust лежит трейт Future. Future — это значение, которое ещё может быть не вычислено до конца. ## Ignore this: ## --- ## Интерфейс асинхронной функции Каждая асинхронная функция в rust превращается в обычную функцию, которая выдает некоторую машину состояний (создается приватный тип, который известен только компилятору, примерно как с сокрытиями), для которой реализован трейт Future. ## --- ## Start 2 В прошлом уроке мы узнали про то, что асинхронность в rust реализована через кооперативную многозадачность, а значит задачи должны cами передавать управление потоком. Но как же это происходит в асинхронной функции? Ведь в ней мы делаем возврат из функции только с готовыми значениями. На самом деле async/await это лишь синтаксический сахар, и компилятор сильно преобразует то, что написано в асинхронной функции. ## Трейт Future ОР - Знает как работает трейт Future. Для начала мы разберем трейт Future, который позволяет реализовать кооперативность. Его интерфейс весьма прост и состоит лишь из одного метода poll. Вот определение трейта: ```rust pub trait Future { type Output; // Required method fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; } ``` poll означает запрос, может ли future выдать прямо сейчас готовое значение. Если да, то возвращается значение Poll::Ready(val), если нет, то Poll::Pending (подобно на Option). Возврат Poll::Pending и является моментом передачи управления потоком от задачи исполнителю и означает, что нужный для исполнения ресурс пока не доступен и нужно подождать (к примеру, попытка подключения к сокету и ожидание, пока соединение установится). Но сколько же исполнителю нужно ждать? Для этого в метод poll передается специальная структура `Waker`, через которую исполнителя можно уведомить, что возможно выполнить прогресс по задаче, вызвав `Waker.wake()` или `Waker.wake_by_ref()`. Получить `Waker` можно через `Context`, переданный в метод poll, вызвав `Context.waker()` (пока `Context` используется только для получения `Waker`). Этот `Waker` уже обычно передается куда-нибудь, что сможет вызывать `Waker` по возможной готовности ресурса (потому что сам future уже вернул управление и никак не может выполнить никакой код). Трейт Future реализован для всех асинхронных функций. На самом деле, когда мы вызываем метод tokio::runtime::Runtime::block_on, мы просто вызываем poll до тех пор, пока он не вернет Poll::Ready(val). Квиз ОР - Знает как работает трейт Future. Какой метод вызывается, если исполнитель знает, что можно сделать прогресс у задачи? 1. Future::poll Верно, это метод, который обозначает запрос и позволяет узнать, выполнена ли до конца задача 2. Waker::wake_by_ref Нет, хоть эта функция и создана для уведомления о возможности прогресса по задаче, сделана она концептуально для вызова со стороны задачи для уведомления исполнителя о готовности 3. Poll::Pending Нет, это возвращаемое значение, которое означает, что задача еще до конца не выполнена ## Практика ОР - Понимает, как асинхронный рантайм выполняет задачи. Попробуем написать свой мини-tokio. Для этого создадим новый проект через cargo, добавив в зависимости waker-fn и smol. smol - это рантайм, очень похожий на tokio, но в отличии от него, позволяет исполняться своим функциям не в своих рантаймах. Нам в данном примере от него нужны только асинхронные функции для I/O. Для примера, попробуем выполнить простую асинхронную функцию: ```rust async fn read_toml() -> String { smol::fs::read_to_string("./Cargo.toml").await.unwrap() } ``` Пока эту функцию можно воспринимать аналогично этой: ```rust fn read_toml() -> impl Future { async { smol::fs::read_to_string("./Cargo.toml").await.unwrap() } } ``` То есть, при вызове `read_toml()`, мы получим что-то, что реализует трейт Future (это будет приватный тип, известный только компилятору). На полученном значении мы и будем вызывать poll: ```rust use std::task::{Context, Waker}; fn main() { // Создание контекста с Waker, который при вызове ничего не делает let waker = Waker::noop(); let mut cx = Context::from_waker(waker); let future = read_toml(); // Про Pin будет ниже, пока это расматривайте как требование для поллинга фьючерсов let mut future = std::pin::pin!(future); loop { match future.as_mut().poll(&mut cx) { std::task::Poll::Pending => { println!("Pending future"); } std::task::Poll::Ready(value) => { println!("Ready! value:\n{value}"); break; } } } } ``` Попробуем запустить код. Он успешно вывел файл! Но у нашего рантайма есть проблема - он никогда не ожидает готовности ресурсов, что приводит к бесмысленной трате процессорного времени. Попробуем это оптимизировать: будем отправлять поток в сон, пока не вызовется Waker: ```rust use std::task::{Context, Waker}; use waker_fn::waker_fn; async fn read_toml() -> String { smol::fs::read_to_string("./Cargo.toml").await.unwrap() } fn main() { let (waker, wait) = make_waker(); let mut cx = Context::from_waker(&waker); let future = read_toml(); let mut future = std::pin::pin!(future); loop { match future.as_mut().poll(&mut cx) { std::task::Poll::Pending => { println!("Pending future"); } std::task::Poll::Ready(value) => { println!("Ready! value:\n{value}"); break; } } wait(); } } /// Returns (waker, wait) fn make_waker() -> (Waker, impl Fn()) { let t = std::thread::current(); let waker = waker_fn(move || { t.unpark(); }); let wait = move || { // Go to sleep and wait std::thread::park(); }; (waker, wait) } ``` (Здесь для упрощения примера используется крейт waker-fn, но Waker можно собирать и без него) При исполнении результат будет таким же, но poll не будет вызываться бессмысленно кучу раз. Если сравнить вывод, то теперь `Pending future` вывелся только один раз, хотя в прошлый раз он вывелся очень много раз. Заметьте, это не означает, что poll всегда возвращает Pending только один раз, это значение может возвращаться сколь угодно раз, пока не получится вернуть значение Ready. Данный пример сильно упрощен. На деле рантаймы не просто засыпают, а делают определенный syscall, который вернется при доступности одного из запрошенных ресурсов Квиз ОР - Понимает, как асинхронный рантайм выполняет задачи. Мульти: Когда рантайм вызовет Future::poll? 1. В первый раз по возможности Правильно, для первого вызова poll ничего делать не надо 2. Сразу после предыдущего вызова poll Нет, рантайм будет ждать, пока не вызовется waker 3. Когда не будет других задач для выполнения Нет, рантайм без задач рантайм отправится в ожидание 4. После вызова Waker когда исполнитель освободится Правильно, после этого вызова исполнитель будет знать, что в исполнении задачи может произойти прогресс, и вызовет тогда, когда освободится от других задач 5. Сразу после вызова Waker Нет, poll вызовется только тогда, когда исполнитель освободится от других задач ## Трейт Future ОР - Знает как работает трейт Future. ОР - Может написать свою реализацию Future. А теперь подойдем к реализации асинхронности со стороны задачи. Попробуем написать свое асинхронное ожидание промежутка времени используя трейт Future. Создадим функцию ```rust fn wait_for(duration: Duration) -> impl Future { WaitFor { duration } } struct WaitFor { duration: Duration, } ``` Теперь попробуем реализовать Future. Пускай для начала просто возвращает Pending: ```rust impl Future for WaitFor { type Output = (); fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { std::task::Poll::Pending } } ``` Заметьте, как функция `wait_for` по определению похожа на функцию `read_toml`, что мы определяли ранее. Из-за того, что мы возвращаем фьючерс, это функция является асинхронной. Поэтому мы можем вызывать на ней .await! (который, по сути, является просто синтаксическим сахаром для поллинга) Для этого напишем такой main: ```rust #[tokio::main] async fn main() { println!("Before wait"); wait_for(std::time::Duration::from_secs(2)).await; println!("After wait"); } ``` Задание: попробуйте написать то же самое по функциональности, но используя рантайм, который мы только что написали сами Попробуем запустить, программа такой результат: ``` Before wait ``` `After wait` не вывелся, так как Future продолжит исполняться только после вызова Waker (не не обязательно сразу после вызова, так как исполнитель может быть занят другими задачами, и задача не начнет исполняться, пока исполнитель не освободится, так как у нас кооперативная многозадачность). Попробуем вызвать Waker по истечению нужного промежутка времени. Для простоты примера, будет вызываться отдельный поток, но обычно в рантаймах Waker передается в собственный I/O event loop, который вызовет Waker при выполнении нужных условий. ```rust fn poll( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { let waker = cx.waker().clone(); let duration = self.duration; std::thread::spawn(move || { std::thread::sleep(duration); waker.wake(); }); std::task::Poll::Pending } ``` Попробуем вызвать, но в консоли все еще выводится только `Before wait`. Почему так происходит? На самом деле наш poll вызывается, в данном случае, раз в две секунды (потому что создаем поток, который через 2 секунды вызывает Waker), Но мы никогда не возвращаем Ready. Попробуем это исправить: ```rust struct WaitFor { duration: Duration, waited: bool, } fn wait_for(duration: Duration) -> WaitFor { WaitFor { duration, waited: false } } impl Future for WaitFor { type Output = (); fn poll( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { if self.waited { return std::task::Poll::Ready(()); } self.waited = true; let waker = cx.waker().clone(); let duration = self.duration; std::thread::spawn(move || { std::thread::sleep(duration); waker.wake(); }); std::task::Poll::Pending } } ``` Попробуем запустить: ``` Before wait After wait ``` Ура! 🎉 Теперь мы получили ожидаемый результат. Квиз ОР - Может написать свою реализацию Future. Какой вызов Future всегда является последним? 1. Когда возвращает Self::Output Нет, фьючерс никогда не может просто вернуть Self::Output. Вместо этого по готовности возвращается Poll::Ready(Self::Output) 2. Когда возвращает Poll::Ready(Self::Output) Правильно, после возврата Poll::Ready(Self::Output) фьючерс больше не должен вызываться 3. Когда возвращает Poll::Pending Нет, Poll::Pending возвращается во все вызовы, кроме последнего, в котором возвращается Poll::Ready(Self::Output) 4. Следующий за вызовом Waker Нет, после вызова Waker фьючерс может вызваться сколь угодно раз ## Преобразование синтаксиса async/await в Future ОР - Знает, как компилятор преобразует асинхронный код. Теперь, зная что async/await просто превращается во фьючерс, посмотрим, как это происходит на примере такой функции: ```rust async fn wait() -> usize { tokio::task::yield_now().await; 67 } ``` На самом деле, при компиляции она превратится в некую машину состояний: (это псевдокод, так как написать в точности, как будет скомпилировано, невозможно) ```rust fn wait() -> Wait { Wait::new() } struct Wait { state: u8, yield_now: typeof, } impl Wait { fn new() -> Self { unsafe { std::mem::zeroed() } } } impl Future for Wait { type Output = usize; fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { use std::task::Poll; loop { match self.state { 0 => { self.yield_now = tokio::task::yield_now(); self.state += 1; }, 1 => { match self.yield_now.poll() { Poll::Pending => return Poll::Pending Poll::Ready(_) => { self.state += 1; } } }, 2 => { self.state += 1; return Poll::Ready(67) } _ => panic!("wait called after completion") } } } } ``` Квиз ОР - Знает, как компилятор преобразует асинхронный код. Допустим, у нас есть такая функция: ```rust async fn number() -> usize { 42 } ``` Что вернет первый вызов Future::poll на ней? 1. 42 Нет, poll всегда возвращает значение типа Poll, в данном случае вернется Poll::Ready(42) 2. Poll::Pending Нет, в данном случае значение сразу готово, так как нет .await поинтов, и при первом же вызове вернется Poll::Ready(42) 3. Poll::Ready(42) Верно, первый вызов сразу же вернет это значение, так как значение будет сразу готово ## Pin ОР - Понимает, зачем нужен Pinning. Заметили, что poll везде принимает загадочный тип Pin? Давайте разберем данную функцию: ```rust async fn selfreferential() { let value = 1; let refvalue = &value; tokio::task::yield_now().await; println!("value by ref: {}", refvalue); } ``` Как для нее будет выглядеть тип, как из примера выше? ```rust struct Selfreferential { value: u32,<----| // refvalue ссылается на value refvalue: &u32,-| yield_now: typeof, } ``` Что произойдет, если для нее выполнить самое обычное перемещение в памяти? референс refvalue станет невалидным! Ведь референс уже будет указывать на невалидную область памяти. А для раста это недопустимо. Поэтому для вызова асинхронной функции требуется Pin: он принимает указатель на объект и дает гарантии, что объект под этим указателем не будет перемещен в памяти Квиз ОР - Понимает, зачем нужен Pinning. Если бы для вызова асинхронной функции не требовался тип Pin, какая проблема возникала бы? 1. Было бы не безопасно использовать указатели Правильно 2. Асинхронные функции не могли бы вызывать друг друга Нет,вызов не связан с пиннингом. Pin нужен для управления размещением в памяти, а не для вызовов функций. 3. Выполнение асинхронного кода стало бы значительно медленнее Нет, Pin — это механизм безопасности, а не оптимизации. Его наличие или отсутствие не влияет напрямую на скорость выполнения, только на корректность и безопасность доступа к данным 4. Future не смог бы хранить ссылки на другие Future Нет, можно хранить ссылки между Future, если соблюдены правила заимствования. Pin лишь гарантирует, что объект не будет перемещён, но не ограничивает владение ## Асинхронная рекурсия ОР - Умеет использовать трейт-объекты Box\. Допустим, вы решили написать асинхронную рекурсивную функцию. Пускай она выглядит так: ```rust async fn recursion() { recursion().await; } ``` Как будет выглядеть тип для нее (по примеру с Wait из примера выше)? Кажется, вот так: ```rust struct Recursion { state: u8, recursion: Recursion, } ``` Но вложенные структуры невозможны, так как тогда они будут бесконечного размера. Тут на помощь приходит Box: ```rust struct Recursion { state: u8, recursion: Box, } ``` Не забываем, что для вызова нам потом нужен будет Pin, поэтому определение будет такое: ```rust struct Recursion { state: u8, recursion: Pin, } ``` И фьючерс стал конечного размера. В самой функции это будет выглядеть так: ```rust async fn recursion() { Box::pin(recursion()).await; } ``` Квиз ОР - Умеет использовать трейт-объекты Box\. Без какого типа можно создать простую асинхронную функцию, но не создать синхронную? 1. Future Нет, Future нужен для всех асинхронных функций, а еще Future - это трейт 2. Pin Нет, Pin нужен для вызова poll на всех фьючерсах 3. Box Правильно, благодаря нему фьючерс не будет бесконечно большим ## Практика Порой, выполнение функции по времени нужно ограничить. Попробуйте написать такой таймаут! ```rust use std::{ pin::Pin, task::{Context, Poll}, time::Duration, }; use tokio::time::Sleep; #[derive(Debug)] enum TimeoutFuture { Result(O), Timeout, } fn timeouted_read( timeout: Duration, future: F, ) -> TimeoutedFuture<::IntoFuture> { let future = future.into_future(); let sleep = tokio::time::sleep(timeout); TimeoutedFuture { future, sleep } } struct TimeoutedFuture { future: F, sleep: Sleep, } impl TimeoutedFuture { fn future(self: Pin<&mut Self>) -> Pin<&mut F> { unsafe { self.map_unchecked_mut(|s| &mut s.future) } } fn sleep(self: Pin<&mut Self>) -> Pin<&mut Sleep> { unsafe { self.map_unchecked_mut(|s| &mut s.sleep) } } } impl Future for TimeoutedFuture { type Output = TimeoutFuture; fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // Релеазуйте метод poll } } // Тесты #[tokio::main] async fn main() { let instant = async { 0 }; let result = timeouted_read(Duration::from_millis(123), instant).await; println!("Result: {result:?}"); // Result(0) let wait100 = async { let delay = 100; tokio::time::sleep(Duration::from_millis(delay)).await; delay }; let result = timeouted_read(Duration::from_millis(123), wait100).await; println!("Result: {result:?}"); // Result(100) let wait150 = async { let delay = 150; tokio::time::sleep(Duration::from_millis(delay)).await; delay }; let result = timeouted_read(Duration::from_millis(123), wait150).await; println!("Result: {result:?}"); // Timeout let never = std::future::pending::(); let result = timeouted_read(Duration::from_millis(123), never).await; println!("Result: {result:?}"); // Timeout } ``` ### Подсказки - Для получение фьчерсов, на которых можно вызвать poll, реализованы помогающие методы TimeoutedFuture::future() и TimeoutedFuture::sleep(). - Чтобы два раза вызвать методы, забирающие владение Pin, можно использовать метод Pin::as_mut - Чтобы проверить, не завершился ли фьючерс, можно вызвать метод Future::Poll - По окончанию выполнения Future::poll возвращает Poll::Ready - Poll::Pending означает, что результат еще не готов Проверьте свой код по чек-листу: Механика квиз-множественный выбор (все ответы верные, без фидб==э==ков) - [ ] Вызывается poll на future - [ ] Вызывается poll на sleep - [ ] При завершении future возвращается значение из нее внутри TimeoutFuture::Result - [ ] При таймауте возвращается TimeoutFuture:Timeout - [ ] Все тесты проходят ### Решение ```rust impl Future for TimeoutedFuture { type Output = TimeoutFuture; fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let future = self.as_mut().future(); match future.poll(cx) { Poll::Ready(output) => return Poll::Ready(TimeoutFuture::Result(output)), Poll::Pending => {} } let sleep = self.as_mut().sleep(); match sleep.poll(cx) { Poll::Ready(()) => Poll::Ready(TimeoutFuture::Timeout), Poll::Pending => Poll::Pending, } } } ``` P.S.: то, что вы написали, альтернативно функции tokio::time::timeout Чтобы проверить, не завершился ли фьючерс, можно вызвать метод Future::Poll По окончанию выполнения Future::poll возвращает Poll::Ready Poll::Pending означает, что результат еще не готов Проверьте свой код по чек-листу: Механика квиз-множественный выбор (все ответы верные, без фидб==э==ков) Вызывается poll на future Вызывается poll на sleep При завершении future возвращается значение из нее внутри TimeoutFuture::Result При таймауте возвращается TimeoutFuture:Timeout Все тесты проходят