190 lines
14 KiB
Markdown
190 lines
14 KiB
Markdown
В прошлом уроке мы разобрали, как устроена асинхронность в rust под капотом и как рантайм вызывает код. В этом уроке мы окунемся поглубже в то, как в tokio происходит ожидание готовностей ресурсов, то, как работать с асинхронным вводом/выводом, как делать асинхронные сетевые запросы, работать с файловой системой, и как использовать неблокирующие примитивы синхронизации
|
||
|
||
## Асинхронные чтение/запись
|
||
ОР - Умеет работать с асинхронным абстрактным вводом/выводом.
|
||
В rust для чтения/записи всего (к примеру, файлов, или сокетов) есть трейты Read и Write, но свои задачи они выполняют синхронно. Для асихронного мира существуют трейты AsyncRead и AsyncWrite. Их определение выглядит так:
|
||
```rust
|
||
pub trait AsyncRead {
|
||
// Required method
|
||
fn poll_read(
|
||
self: Pin<&mut Self>,
|
||
cx: &mut Context<'_>,
|
||
buf: &mut ReadBuf<'_>,
|
||
) -> Poll<Result<()>>;
|
||
}
|
||
```
|
||
```rust
|
||
pub trait AsyncWrite {
|
||
// Required methods
|
||
fn poll_write(
|
||
self: Pin<&mut Self>,
|
||
cx: &mut Context<'_>,
|
||
buf: &[u8],
|
||
) -> Poll<Result<usize, Error>>;
|
||
fn poll_flush(
|
||
self: Pin<&mut Self>,
|
||
cx: &mut Context<'_>,
|
||
) -> Poll<Result<(), Error>>;
|
||
fn poll_shutdown(
|
||
self: Pin<&mut Self>,
|
||
cx: &mut Context<'_>,
|
||
) -> Poll<Result<(), Error>>;
|
||
|
||
// Provided methods
|
||
fn poll_write_vectored(
|
||
self: Pin<&mut Self>,
|
||
cx: &mut Context<'_>,
|
||
bufs: &[IoSlice<'_>],
|
||
) -> Poll<Result<usize, Error>> { ... }
|
||
fn is_write_vectored(&self) -> bool { ... }
|
||
}
|
||
```
|
||
Можно по определению методов заметить, что поведение у них идентично Future::poll - если могут выполнить чтение/запись прямо сейчас, то возвращают Poll::Ready, а если нет, то Poll::Pending. Но, с этими трейтами пользователи библиотек не взаимодействуют, для всех типов, для которых реализованы AsyncRead/AsyncWrite реализованы и трейты AsyncReadExt/AsyncWriteExt соответственно. У них интерфейс уже почти идентичен std::io::{Read,Write}. К примеру у AsyncReadExt:
|
||
```rust
|
||
pub trait AsyncReadExt: AsyncRead {
|
||
// Provided methods
|
||
fn chain<R>(self, next: R) -> Chain<Self, R>
|
||
where Self: Sized,
|
||
R: AsyncRead { ... }
|
||
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
|
||
where Self: Unpin { ... }
|
||
fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
|
||
where Self: Unpin,
|
||
B: BufMut + ?Sized { ... }
|
||
fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
|
||
where Self: Unpin { ... }
|
||
fn read_u8(&mut self) -> ReadU8<&mut Self>
|
||
where Self: Unpin { ... }
|
||
// и другие методы
|
||
}
|
||
```
|
||
Эти трейты позволяют писать асинхронные чтение/запись почти идентично синхронным версиям, с одним только отличием - добавлением await
|
||
## Взаимодействие с интернетом и фс
|
||
ОР - Может использовать неблокирующее сетевое взаимодействие (TCP, UDP) и работу с файловой системой.
|
||
Чтобы делать сетевые запросы в tokio реализованы TcpStream, UdpSocket и TcpSocket (асинхронные аналоги таких же синхронных версий из стандартной библиотеки)
|
||
Давайте попробуем написать небольшой http клиент:
|
||
```rust
|
||
use tokio::net::TcpStream;
|
||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||
|
||
#[tokio::main]
|
||
async fn main() -> std::io::Result<()> {
|
||
// Подключение к http://google.com
|
||
let mut stream = TcpStream::connect(("google.com", 80)).await?;
|
||
|
||
// Отправка GET запроса
|
||
stream.write_all(b"GET / HTTP/1.1\r\nHost: google.com\r\n\r\n").await?;
|
||
|
||
// Упрощено для примера, но обычно так не делают
|
||
// Работает для этого примера, так как
|
||
// гугл отсылает ответ одним tcp пакетом
|
||
let mut buf = [0u8; 1024];
|
||
let len = stream.read(&mut buf).await?;
|
||
let buf = buf[..len].to_vec();
|
||
let output = String::from_utf8(buf).unwrap();
|
||
|
||
println!("Returned:\n{output}");
|
||
Ok(())
|
||
}
|
||
```
|
||
Как видите, интерфейс абсолютно такой же, как и у синхронных трейтов в стандартной библиотеке, только добавляется .await после вызова.
|
||
Для работы с файловой системой все аналогично:
|
||
```rust
|
||
#[tokio::main]
|
||
async fn main() -> std::io::Result<()> {
|
||
let cargo = tokio::fs::read_to_string("./Cargo.toml").await?;
|
||
println!("Cargo.toml content:\n{cargo}");
|
||
Ok(())
|
||
}
|
||
```
|
||
|
||
ОР - Умеет использовать неблокирующие примитивы синхронизации и каналы разных видов.
|
||
Так же есть похожие на стандартные каналы:
|
||
`tokio::sync::mpsc::unbounded_channel()` - альтернатива `std::sync::mpsc::channel()` из стандартной библиотеки, но при ожидании результата не блокирует поток, а передает управление
|
||
`tokio::sync::mpsc::channel(buffer: usize)` - альтернатива `std::sync::mpsc::sync_channel()` из стандартной библиотеки, но при заполнении буфера не блокирует поток, а передает управление
|
||
И примитивы синхронизации:
|
||
`tokio::sync::Mutex` - альтернатива `std::sync::Mutex`, но не блокирует поток при вызове lock, а еще не будет происходить дедлока при пересечении .await. Если один фьючерс залочил значение в стандартном мьютексе, потом вызвал асинхронную функцию, и другому фьючерсу понадобилось залочить этот мьютекс, то возникнет дедлок. Поэтому, если MutexGuard проходит через .await, нужно использовать именно `tokio::sync::Mutex`
|
||
`tokio::sync::RwLock` - те же преимущества, что и у мьютекса
|
||
|
||
Квиз
|
||
ОР - Умеет использовать неблокирующие примитивы синхронизации и каналы разных видов.
|
||
Чем отличается tokio::sync::mpsc::unbounded_channel() от стандартного std::sync::mpsc::channel()?
|
||
Он поддерживает передачу только Send + Sync типов
|
||
Нет, это является требованием для обоих каналов
|
||
Он использует общий буфер для всех задач
|
||
Нет, у каждого канала свой буфер
|
||
Он не блокирует поток при ожидании, а передаёт управление исполнителю
|
||
Правильно, тем самым исполнитель не простаивает на ожидании
|
||
Он автоматически очищает буфер после каждого await
|
||
Нет, очистка буфера не зависит от вызова await
|
||
|
||
Квиз
|
||
Что произойдет, если использовать std::sync::Mutex внутри async-функции и удерживать его guard через .await?
|
||
Мьютекс автоматически разблокируется при .await
|
||
Нет, мьютекс не знает ничего про асинхронность
|
||
Может возникнуть дедлок
|
||
Правильно, после переключения к мьютексу может обратиться другой фьючерс и возникнет дедлок
|
||
Компилятор выдаст ошибку
|
||
Нет, компилятор не выдаст ошибку. Но при этом есть линтеры, которые укажут о прохождение мьютекса из стандартной библиотеки через await
|
||
|
||
Квиз
|
||
Что произойдет при переполнении буфера у `tokio::sync::mpsc::channel()`?
|
||
Поток заблокируется до освобождения места
|
||
Нет, если буфер заполнен, текущая задача приостанавливается (`.await`), но поток может продолжить выполнение других задач. Это ключевое отличие от `std::sync::mpsc::sync_channel`
|
||
Новые сообщения будут теряться
|
||
Нет, каналы не теряют сообщения
|
||
Задача временно уступит управление (await), пока не появится место
|
||
Правильно! При переполнении буфера отправка сообщения асинхронно приостанавливает текущую задачу (`.await`), позволяя планировщику выполнять другие задачи до освобождения места в канале
|
||
Произойдет паника
|
||
Нет, каналы при переполнении не паникуют
|
||
|
||
## Таймаут по вызову Future
|
||
ОР - Понимает, как использовать таймауты, неблокирующее ожидание и cancelation safety.
|
||
Для ожидания фьючерса с таймаутом в токио есть удобная функция tokio::time::timeout
|
||
```rust
|
||
if let Err(_) = tokio::time::timeout(Duration::from_secs(1), std::future::pending()).await {
|
||
println!("не выполнилось в течении 1 секунды");
|
||
}
|
||
```
|
||
По истечении указанного времени исполнение Future отменяется
|
||
## Вызов cpu-bound кода из асинхронного
|
||
ОР - Понимает, как использовать таймауты, неблокирующее ожидание и cancelation safety.
|
||
Так как асинхронность в rust кооперативная, возвращать управление потоком нужно как можно быстрее. Но что если у нас вместо io-bound задачи возникает cpu-bound задача? Тогда задачу нужно вымещать в отдельный thread pool. В токио есть встроенный (который упоминается как worker threads), его удобство заключается в том, что ожидать результата синхронноой функции можно через await, не блокируя исполнения, как с ожиданием у обычного thread pool
|
||
```rust
|
||
let res = tokio::task::spawn_blocking(move || {
|
||
// cpu-bound задачи. К примеру, перемножение матриц
|
||
multipy_matrixes()
|
||
}).await?;
|
||
```
|
||
Самое интересное, что большинство функций в модуле tokio::fs это такие обертки spawn_blocking над синхронными версиями. К примеру, tokio::fs::read_to_string, что мы использовали ранее в самом токио выглядит так:
|
||
```rust
|
||
pub async fn read_to_string(path: impl AsRef<Path>) -> io::Result<String> {
|
||
let path = path.as_ref().to_owned();
|
||
// asyncify - вызывает внутри spawn_blocking,
|
||
// просто с дополнительной небольшой проверкой
|
||
asyncify(move || std::fs::read_to_string(path)).await
|
||
}
|
||
```
|
||
Это обусловленно тем, у некоторых операционных систем блокирующие чтения файлов. Сетевые же запросы, к примеру, сделаны асинхронно
|
||
|
||
## Cancelation safety
|
||
ОР - Понимает, как использовать таймауты, неблокирующее ожидание и cancelation safety.
|
||
Cancellation safety — это свойство future, которое гарантирует, что если её отменить (вызов drop), это не приведёт к потере данных или нарушению логики программы. К примеру, разберем такой псевдокод:
|
||
```rust
|
||
let tcpstream = TcpStream::connect("localhost:8000").await.unwrap();
|
||
loop {
|
||
tokio::select! {
|
||
v = read_some_data(&mut tcpstream) => {
|
||
// do something with data
|
||
},
|
||
_ => tokio::time::sleep(Duration::from_millis(10)) => {}
|
||
}
|
||
println!("Cyclel timeout");
|
||
}
|
||
```
|
||
Выглядит хорошо: ожидаем чтение, а по таймауту перезапускаем цикл. Но что если read_some_data уже успела прочитать какие-то данные до таймаута, но не успела вернуть их? Тогда эти данные будут утеряны навсегда. Такие функции, отмена выполнения которых приводит к потере данных не считаются cancelation safe.
|
||
Примерами таких функций из токио являются:
|
||
`tokio::io::AsyncReadExt::read_exact`
|
||
`tokio::io::AsyncReadExt::read_to_end`
|
||
`tokio::io::AsyncReadExt::read_to_string`
|
||
`tokio::io::AsyncWriteExt::write_all` |