14 KiB
В прошлом уроке мы разобрали, как устроена асинхронность в rust под капотом и как рантайм вызывает код. В этом уроке мы окунемся поглубже в то, как в tokio происходит ожидание готовностей ресурсов, то, как работать с асинхронным вводом/выводом, как делать асинхронные сетевые запросы, работать с файловой системой, и как использовать неблокирующие примитивы синхронизации
Асинхронные чтение/запись
ОР - Умеет работать с асинхронным абстрактным вводом/выводом. В rust для чтения/записи всего (к примеру, файлов, или сокетов) есть трейты Read и Write, но свои задачи они выполняют синхронно. Для асихронного мира существуют трейты AsyncRead и AsyncWrite. Их определение выглядит так:
pub trait AsyncRead {
// Required method
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<()>>;
}
pub trait AsyncWrite {
// Required methods
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, Error>>;
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Error>>;
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Error>>;
// Provided methods
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize, Error>> { ... }
fn is_write_vectored(&self) -> bool { ... }
}
Можно по определению методов заметить, что поведение у них идентично Future::poll - если могут выполнить чтение/запись прямо сейчас, то возвращают Poll::Ready, а если нет, то Poll::Pending. Но, с этими трейтами пользователи библиотек не взаимодействуют, для всех типов, для которых реализованы AsyncRead/AsyncWrite реализованы и трейты AsyncReadExt/AsyncWriteExt соответственно. У них интерфейс уже почти идентичен std::io::{Read,Write}. К примеру у AsyncReadExt:
pub trait AsyncReadExt: AsyncRead {
// Provided methods
fn chain<R>(self, next: R) -> Chain<Self, R>
where Self: Sized,
R: AsyncRead { ... }
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
where Self: Unpin { ... }
fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
where Self: Unpin,
B: BufMut + ?Sized { ... }
fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
where Self: Unpin { ... }
fn read_u8(&mut self) -> ReadU8<&mut Self>
where Self: Unpin { ... }
// и другие методы
}
Эти трейты позволяют писать асинхронные чтение/запись почти идентично синхронным версиям, с одним только отличием - добавлением await
Взаимодействие с интернетом и фс
ОР - Может использовать неблокирующее сетевое взаимодействие (TCP, UDP) и работу с файловой системой. Чтобы делать сетевые запросы в tokio реализованы TcpStream, UdpSocket и TcpSocket (асинхронные аналоги таких же синхронных версий из стандартной библиотеки) Давайте попробуем написать небольшой http клиент:
use tokio::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Подключение к http://google.com
let mut stream = TcpStream::connect(("google.com", 80)).await?;
// Отправка GET запроса
stream.write_all(b"GET / HTTP/1.1\r\nHost: google.com\r\n\r\n").await?;
// Упрощено для примера, но обычно так не делают
// Работает для этого примера, так как
// гугл отсылает ответ одним tcp пакетом
let mut buf = [0u8; 1024];
let len = stream.read(&mut buf).await?;
let buf = buf[..len].to_vec();
let output = String::from_utf8(buf).unwrap();
println!("Returned:\n{output}");
Ok(())
}
Как видите, интерфейс абсолютно такой же, как и у синхронных трейтов в стандартной библиотеке, только добавляется .await после вызова. Для работы с файловой системой все аналогично:
#[tokio::main]
async fn main() -> std::io::Result<()> {
let cargo = tokio::fs::read_to_string("./Cargo.toml").await?;
println!("Cargo.toml content:\n{cargo}");
Ok(())
}
ОР - Умеет использовать неблокирующие примитивы синхронизации и каналы разных видов.
Так же есть похожие на стандартные каналы:
tokio::sync::mpsc::unbounded_channel() - альтернатива std::sync::mpsc::channel() из стандартной библиотеки, но при ожидании результата не блокирует поток, а передает управление
tokio::sync::mpsc::channel(buffer: usize) - альтернатива std::sync::mpsc::sync_channel() из стандартной библиотеки, но при заполнении буфера не блокирует поток, а передает управление
И примитивы синхронизации:
tokio::sync::Mutex - альтернатива std::sync::Mutex, но не блокирует поток при вызове lock, а еще не будет происходить дедлока при пересечении .await. Если один фьючерс залочил значение в стандартном мьютексе, потом вызвал асинхронную функцию, и другому фьючерсу понадобилось залочить этот мьютекс, то возникнет дедлок. Поэтому, если MutexGuard проходит через .await, нужно использовать именно tokio::sync::Mutex
tokio::sync::RwLock - те же преимущества, что и у мьютекса
Квиз ОР - Умеет использовать неблокирующие примитивы синхронизации и каналы разных видов. Чем отличается tokio::sync::mpsc::unbounded_channel() от стандартного std::sync::mpsc::channel()? Он поддерживает передачу только Send + Sync типов Нет, это является требованием для обоих каналов Он использует общий буфер для всех задач Нет, у каждого канала свой буфер Он не блокирует поток при ожидании, а передаёт управление исполнителю Правильно, тем самым исполнитель не простаивает на ожидании Он автоматически очищает буфер после каждого await Нет, очистка буфера не зависит от вызова await
Квиз Что произойдет, если использовать std::sync::Mutex внутри async-функции и удерживать его guard через .await? Мьютекс автоматически разблокируется при .await Нет, мьютекс не знает ничего про асинхронность Может возникнуть дедлок Правильно, после переключения к мьютексу может обратиться другой фьючерс и возникнет дедлок Компилятор выдаст ошибку Нет, компилятор не выдаст ошибку. Но при этом есть линтеры, которые укажут о прохождение мьютекса из стандартной библиотеки через await
Квиз
Что произойдет при переполнении буфера у tokio::sync::mpsc::channel()?
Поток заблокируется до освобождения места
Нет, если буфер заполнен, текущая задача приостанавливается (.await), но поток может продолжить выполнение других задач. Это ключевое отличие от std::sync::mpsc::sync_channel
Новые сообщения будут теряться
Нет, каналы не теряют сообщения
Задача временно уступит управление (await), пока не появится место
Правильно! При переполнении буфера отправка сообщения асинхронно приостанавливает текущую задачу (.await), позволяя планировщику выполнять другие задачи до освобождения места в канале
Произойдет паника
Нет, каналы при переполнении не паникуют
Таймаут по вызову Future
ОР - Понимает, как использовать таймауты, неблокирующее ожидание и cancelation safety. Для ожидания фьючерса с таймаутом в токио есть удобная функция tokio::time::timeout
if let Err(_) = tokio::time::timeout(Duration::from_secs(1), std::future::pending()).await {
println!("не выполнилось в течении 1 секунды");
}
По истечении указанного времени исполнение Future отменяется
Вызов cpu-bound кода из асинхронного
ОР - Понимает, как использовать таймауты, неблокирующее ожидание и cancelation safety. Так как асинхронность в rust кооперативная, возвращать управление потоком нужно как можно быстрее. Но что если у нас вместо io-bound задачи возникает cpu-bound задача? Тогда задачу нужно вымещать в отдельный thread pool. В токио есть встроенный (который упоминается как worker threads), его удобство заключается в том, что ожидать результата синхронноой функции можно через await, не блокируя исполнения, как с ожиданием у обычного thread pool
let res = tokio::task::spawn_blocking(move || {
// cpu-bound задачи. К примеру, перемножение матриц
multipy_matrixes()
}).await?;
Самое интересное, что большинство функций в модуле tokio::fs это такие обертки spawn_blocking над синхронными версиями. К примеру, tokio::fs::read_to_string, что мы использовали ранее в самом токио выглядит так:
pub async fn read_to_string(path: impl AsRef<Path>) -> io::Result<String> {
let path = path.as_ref().to_owned();
// asyncify - вызывает внутри spawn_blocking,
// просто с дополнительной небольшой проверкой
asyncify(move || std::fs::read_to_string(path)).await
}
Это обусловленно тем, у некоторых операционных систем блокирующие чтения файлов. Сетевые же запросы, к примеру, сделаны асинхронно
Cancelation safety
ОР - Понимает, как использовать таймауты, неблокирующее ожидание и cancelation safety. Cancellation safety — это свойство future, которое гарантирует, что если её отменить (вызов drop), это не приведёт к потере данных или нарушению логики программы. К примеру, разберем такой псевдокод:
let tcpstream = TcpStream::connect("localhost:8000").await.unwrap();
loop {
tokio::select! {
v = read_some_data(&mut tcpstream) => {
// do something with data
},
_ => tokio::time::sleep(Duration::from_millis(10)) => {}
}
println!("Cyclel timeout");
}
Выглядит хорошо: ожидаем чтение, а по таймауту перезапускаем цикл. Но что если read_some_data уже успела прочитать какие-то данные до таймаута, но не успела вернуть их? Тогда эти данные будут утеряны навсегда. Такие функции, отмена выполнения которых приводит к потере данных не считаются cancelation safe.
Примерами таких функций из токио являются:
tokio::io::AsyncReadExt::read_exact
tokio::io::AsyncReadExt::read_to_end
tokio::io::AsyncReadExt::read_to_string
tokio::io::AsyncWriteExt::write_all