==Умеет порождать конкурентные задачи через spawn/join/select. Асинхронность позволяет эффективно управлять многозадачностью, как и многопоточная программа. Но отличие от многопоточного синхронного кода, асинхронный код позволяет выполнять несколько операций одновременно в одном потоке, что позволяет приложениям, работающих с сетью или файловыми системами, использовать вычислительные ресурсы эффективнее. В этой теме мы рассмотрим два типа многозадачности: кооперативную и вытесняющую. Также мы научимся использовать синтаксис async/await, который используется для написания асинхронного кода в Rust. Вы узнаете, как компилятор обрабатывает асинхронный код и что происходит под капотом. Научимся запускать асинхронные задачи используя tokio, работать с неблокирующими примитивами для сети и файлов, а также порождать конкурентные задачи через spawn/join/select. Всё это поможет создавать быстрые и эффективные асинхронные приложения. ОР — Знает разницу между кооперативной и вытесняющей многозадачностью Допустим, перед вами стоит несколько задач, которые надо выполнить параллельно, но фокусироваться в один момент вы можете только одной задаче. По какой стратегии вы будете переключаться между этими задачами? ## Кооперативная и вытесняющая многозадачность Одним из вариантов, который может прийти в голову, это сделать небольшой прогресс в одной задаче, потом переключиться на вторую, сделать в ней небольшой прогресс, и так далее. Такая многозадачность, когда исполнитель решает момент, когда нужно переключаться между задачами, называется **вытесняющей** (так как одна задача как бы вытесняет другую). По такому принципу распределяют задачи на процессор все современные десктопные ОС. А теперь представьте, что момент переключения на другую задачу решаете не вы, а сама задача. Такая многозадачность называется **кооперативной** (так как задачам приходится кооперировать друг с другом за процессорное время). И у кооперативной многозадачности есть несколько преимуществ. Во первых, реализация такой многозадачности проще, что позволяет быстрее переключаться между задачами. Второе, это упрощение синхронизации по памяти: задачи сами управляют моментом переключения, так что они успевают завершить запись в память, а при вытесняющей переключение может произойти посреди записей и память будет невалидной для других задач. Квиз-Одиночный выбор В чем отличие кооперативной многозадачности от вытесняющей? Исполнитель решает, когда переключаться между задачами Нет, в кооперативной многозадачности момент переключения решают задачи, а не исполнитель Задачи переключаются по истечению времени Нет, задачи могут выполняться сколько угодно Задачи решают, когда можно переключиться с них Правильно, только задачи решают, когда переключаться и могут выполняться сколько угодно Все задачи выполняются только в один поток Нет, такого требования к кооперативности нет ## Многозадачность в асинхронном rust ОР — Знает преимущества кооперативной многозадачности В rust для асинхронного кода выбрана кооперативная многозадачность. Такой выбор обоснован тем, что переключение между задачами в кооперативной многозадачности существенно быстрее, и дешевле по производительности обходится безопасность по памяти. Если бы многозадачность была вытесняющей, то при переключении задач пришлось бы выполнять дополнительные действия для сохранения состояния задачи, в то время как в кооперативной задачи сами отвечают за свое состояние. При этом у кооперативной многозадачности есть и недостатки: если задача зависает, то другие задачи не могут выполняться. Поэтому во всех современных десктопных ОС выбрана именно вытесняющая многозадачность, ведь иначе какой-нибудь процесс случайно или злонамеренно мог бы повесить всю систему. И поэтому при написании асинхронных функций нужно следить за тем, как долго они выполняются, и если возникает хоть немного занимающая время задача, ее нужно выносить в отдельный поток (в рантаймах есть встроенные инструменты для этого) Квиз-Множественный выбор Выберите преимущества кооперативной многозадачности перед вытесняющей. Задачи выполняются в порядке создания Нет, задачи могут выполняться в любом порядке Быстрее переключение между задачами Да, реализация переключения при кооперативной многозадачности проще и быстрее, чем при вытесняющей Идеальная предсказуемость времени выполнения задачи Нет, кооперативная многозадачность не гарантирует идеальную предсказуемость выполнения. Более того, если одна из других задач будет долго исполняться, выполнение задачи тоже затянется Из-за одной неправильной задачи случайно не остановятся другие Нет, это преимущество вытесняющей многозадачности Быстрее безопасное обращение к памяти Да, благодаря предсказуемости момента переключения, задача успеет закончить свое обращение к памяти ## Синтаксис async/await ОР — Умеет использовать async/await синтаксис Для кооперативной многозадачности в самих задачах нужна логика для передачи управления. Для этого в rust существует синтаксис async/await, он выглядит так: ``` // создание асинхронной функции async fn task1() { do_something().await; // вызов асинхроной функции и ожидание результата от нее } ``` Сам по себе вызов такой функции (к примеру, просто `do_something()`) ничего не сделает. Для вызова асинхронной функции нужно использовать .await, если вызывать из асинхронной функции, либо вызывать из рантайма. Для асинхронных замыканий тоже добавляется ключевое слово async: ``` let closure = async || { do_something().await; } ``` Или, если нет аргументов, можно опустить `||`: ``` let closure = async { do_something().await; } ``` Кстати, так же, как и для синхронных сокрытий существуют трейты FnOnce, FnMut и Fn, для асинхронных реализованы AsyncFnOnce, AsyncFnMut, AsyncFn аналогично Квиз-Одиночный выбор Что выведет данный код: ``` fn main() { async { println!("42"); } } ``` Не скомпилируется из-за синтаксической ошибки Нет, данный код полностью валидный и скомпилируется Не скомпилируется, так как асинхронный код нельзя вызывать из синхронной функции Нет, в данном случае просто создается асинхронное замыкание и ничего не вызывается через .await, поэтому код ничего не выведет Ничего Правильно, данный код просто создаст асинхронное замыкание, без его вызова Выведет в консоль 42 Нет, данный лишь создаст асинхронное замыкание, но не вызовет его, поэтому на экран не будет ничего выведено ОР — Умеет настраивать и запускать tokio runtime через макрос ОР — Умеет настраивать и запускать tokio runtime вручную В rust нет встроенного рантайма, они все реализованы в виде библиотек. Самыми популярными являются tokio и smol (раньше еще был async-std, но он теперь discontinued). В данном уроке остановимся на tokio, так как он самый популярный. Чтобы запустить асинхронный код с помошью tokio, можно использовать макрос tokio::main ``` #[tokio::main] async fn main() { do_something().await; } ``` По стандартным настройкам этом макрос cоздает многопоточный рантайм, поэтому он эквивалентен такому коду: ``` fn main() { tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap() .block_on(async { do_something().await; }) } ``` Данный код создает многопоточный рантайм tokio, запускает и ожидает завершения функции, переданной в Builder::block_on `.enable_all()` отвечает за возможность использования всего IO tokio и tokio::time Многопоточность в макросе указать явно можно так: ``` #[tokio::main(flavor = "multi_thread")] async fn main() { do_something().await; } ``` Так же в макросе можно указать количество потоков, на которых будут запускаться асинхронные функции (по умолчанию их столько же, сколько потоков в процессоре в системе): ``` #[tokio::main(flavor = "multi_thread", worker_threads = 8)] async fn main() { //... } ``` То же самое, но без макроса: ``` fn main() { tokio::runtime::Builder::new_multi_thread() .worker_threads(8) .enable_all() .build() .unwrap() .block_on(async { //... }) } ``` Чтобы рантайм был однопоточным, можно в макросе указать параметр flavor: ``` #[tokio::main(flavor = "current_thread")] async fn main() { //... } ``` То же самое, но без макроса: ``` fn main() { tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap() .block_on(async { //... }) } ``` Квиз-Одиночный выбор На скольких потоках будут запускаться асинхронные функции, если просто использовать макрос `#[tokio::main]`: 1 Нет, по умолчанию tokio будет запускать задачи на стольких потоках, сколько потоков у процессора в системе Ошибка, нужно указать явно Нет, указывать явно не нужно, по умолчанию tokio будет запускать задачи на стольких потоках, сколько потоков у процессора в системе 8 Нет, по умолчанию tokio будет запускать задачи на стольких потоках, сколько потоков у процессора в системе Столько, сколько потоков в процессоре на данной системе Правильно, без явного указания потоков будет столько же, сколько их в процессоре в системе ## Конкурентность А теперь, попробуем понаписать асинхронный код. Пускай у нас будет две такие функции: ```rust use tokio::time::sleep; use std::time::Duration; async fn task1() { sleep(Duration::from_millis(1000)).await; print("task1"); } async fn task2() { sleep(Duration::from_millis(500)).await; print("task2"); } ``` Попробуем их вызвать? ```rust #[tokio::main] async fn main() { let res1 = task1().await; let res2 = task2().await; } ``` Попробуйте запустить. Выполнение происходит последовательно. А как нам вызвать их так, чтобы задачи выполнялись параллельно? На помощь вам придут spawn, {{join и select}}[join и select не зависят от рантайма, поэтому представлены в tokio, futures, smol, а join еще и в стандартной библиотеке (пока эксперементально)]. Каждый из них выполняет разные задачи, но одинаковую роль: параллельное исполнение задач, давайте разберем каждого из них! ### join! Макрос join ожидает окончания выполнения всех фьючерсов, переданных ему, а потом возвращает результат их всех ```rust #[tokio::main] async fn main() { let (res1, res2) = tokio::join!( task1(), task2(), ) } ``` А еще, в библиотеке futures, есть функция join_all, которая делает то же самое, но на вход она принимает итератор по фьючерсам: ```rust use futures::future::join_all; async fn foo(i: u32) -> u32 { i } let futures = vec![foo(1), foo(2), foo(3)]; assert_eq!(join_all(futures).await, [1, 2, 3]); ``` ### select! Макрос select ожидает окончания выполнения одного из фьючерсов, и возвращает его значение, которое можно обработать, как в синтаксисе match: ```rust #[tokio::main] async fn main() { let res = tokio::select! { res1 = task1() => { // возможность выполнения логики res1 }, res2 = task2() => res2, }; } ``` При этом, выполнение второго фьючерса ==отменится==. (Кстати, про то, какие фьючерсы не стоит отменять будет рассказано в следующем уроке). К примеру, с помощью select можно отменять ожидание UDP пакета с таймаутом: ```rust use tokio::net::UdpSocket; use tokio::time::sleep; use std::io; #[tokio::main] async fn main() -> io::Result<()> { let sock = UdpSocket::bind("0.0.0.0:8080").await?; let mut buf = [0; 1024]; loop { tokio::select! { data = sock.recv_from(&mut buf) => { let (len, addr) = data?; println!("{:?} bytes received from {:?}", len, addr); } _ = sleep(Duration::from_secs(10)) => {} } } } ``` ### spawn Функция spawn добавит задачу в очередь задач tokio, и при свободном исполнителе, запустит задачу. А если текущий рантайм tokio многопоточный, то это позволит использовать многопоточность, так как задачи, созданные через spawn, tokio может распределять между потоками. Так же возвращает JoinHandle, аналогичные по функционалу JoinHandle из стандартной библиотеки. Но, функцию невозможно выполнить из других рантаймов. ```rust #[tokio::main] async fn main() { let handle = tokio::spawn(async { println!("spawned"); }); // main может напечататься и до spawned, и после // то есть примерно такое же поведение, как и у многопоточности println!("main"); // Дождаться завершения можно вызвав .await, // (как .join() в std::thread::JoinHandle) handle.await.unwrap(); } ``` Квиз-Одиночный выбор Что произойдет с остальными фьючерсами, переданными в select, кроме того, из которого получили значение? 1. Будет ожидание их завершения Нет, их исполнение отменится 2. Отменятся Правильно, их исполнение отмениться (вызовется) 3. Продолжат исполняться параллельно Нет, их выполнение отменится 4. Останутся в том состоянии, котором были Нет, их выполнение отменится Квиз-Одиночный выбор Что делает макрос tokio::join? 5. join возвращает итератор значений, возвращаемых из фьючерсов Нет, он вернет все значения после окончания исполнения фьючерсов 6. Будет ожидание завершения всех фьючерсов, потом вернутся все значения Правильно, он вернет все значения 7. Ждет выполнения одного фьючерса, потом возвращает его значение, а остальные отменяет Нет, таково поведение макроса select, join же ожидает выполнения всех фьючерсов 8. Добавит фьючерсы в очередь задач tokio Нет, такова роль функции tokio::spawn. join будет просто ожидать выполнения всех фьючерсов одновременно ## Практика: мини-бэкенд Попробуйте написать функцию handle_connections, которая будет выполнять переданные ей фьючерсы с помощью tokio::spawn: ```rust use futures::future::join_all; use std::time::Duration; use tokio::time::sleep; // напишите здесь асинхронную функцию handle_connections // Тесты #[tokio::main] async fn main() { use std::time::Instant; let connections = { let mut connections = Vec::new(); for i in 0..10 { let connection = async move { sleep(Duration::from_millis(100)).await; println!("Hello from connection {i}"); }; connections.push(connection); } connections }; let start = Instant::now(); handle_connections(connections).await; let end = start.elapsed(); assert!(end < Duration::from_millis(500)) } ``` ### Подсказки - tokio::spawn запускает фьючерс без вызова .await - При завершении программы все потоки завершатся без ожидания задач в них - .await на JoinHandle, полученом от tokio::spawn, будет ожидать завершения потока - join_all позволяет ожидать выполнения всех фьючерсов в переданном итераторе Проверьте свой код по чек-листу: Механика квиз-множественный выбор (все ответы верные, без фидб==э==ков) - [ ] Используется tokio::spawn - [ ] Задачи выполняются параллельно - [ ] 10 раз печатается `Hello from connection {i}` - [ ] Тест проходит ### Решение ```rust async fn handle_connections(connections: I) where ::Output: Send + 'static, I: IntoIterator, F: Future + Send + 'static, { let mut handles = Vec::new(); for connection in connections.into_iter() { let handle = tokio::spawn(connection); handles.push(handle); } join_all(handles).await; } ``` tokio::spawn запускает фьючерс без вызова .await При завершении программы все потоки завершатся без ожидания задач в них .await на JoinHandle, полученом от tokio::spawn, будет ожидать завершения потока join_all позволяет ожидать выполнения всех фьючерсов в переданном итераторе Проверьте свой код по чек-листу: Механика квиз-множественный выбор (все ответы верные, без фидб==э==ков) Используется tokio::spawn Задачи выполняются параллельно 10 раз печатается `Hello from connection {i}` Тест проходит