こんにちは!SPEEDAプロダクト開発チームの成です。
去年の秋ごろからRustを触り始め、徐々にRustの魅力に惚れられました。 最初は日々コンパイラーにボコボコにされていましたが、 The Book *1 を読みながら、Rustを少しずつ理解していくと、段々コンパイラーと仲良くなってきて、Rustを書くのも楽しくなりました。
小さいな作業効率化のツールから、Rest Api Server、色んな処理を並列化するBatchなどをRustで作ってきました。最近プロダクトのマイクロサービスの極一部もRustで作っており、幸せ感満喫です!!
本日は、HTTPリクエストを並列化するときハマったことをお話したいと思います。 今日の内容はある程度Rustの知識が必要ですが、初めての方は上の The Book の日本語版 から参考できます。
Rustは独特なメモリ管理仕組みを持ち、安全かつRuntimeで発生したら世界が止まるFullGC的なものも一切ないし、高速な並列処理などが特徴です。めちゃくちゃカッコイイ言語と思いますので、本当に広がってほしいです。なので、初めての方々も是非書いてみてください!
はい、宣伝は以上です。本題に入ります。
今日はサンプルコードを介してハマった点を説明させていただきます。
環境
OS | rustバージョン |
---|---|
macOS 10.14.6 | rustc 1.39.0-nightly |
使ったライブラリ
[dependencies] reqwest = "0.9.20" tokio-threadpool = "0.1.15" futures01 = "0.1.28" futures-preview = { version = "0.3.0-alpha.18", features = ["io-compat"] } failure = "0.1.5"
今回は tokio-threadpool
と futuresの01系
を使って並列処理を実現します。
HTTPリクエストのライブラリは reqwest
を使います。
記事の最後にはRust公式の新しい非同期シンタックス async / await
を試したことを共有したいので、 futuresの03系
も入れました。
同期リクエストの実現
1000回リクエストを順番に実行されるサンプルコードです。
use failure::Fail; use reqwest::{Client, Url}; use std::thread; use std::thread::ThreadId; use std::time::{Duration, SystemTime}; pub type ResultWithError<T> = std::result::Result<T, ErrorWrapper>; #[derive(Fail, Debug)] pub enum ErrorWrapper { #[fail(display = "http request error: {:?}", error)] HttpRequestError { error: reqwest::Error }, } impl From<reqwest::Error> for ErrorWrapper { fn from(error: reqwest::Error) -> Self { ErrorWrapper::HttpRequestError { error } } } fn main() { request_with_main_thread(); } fn request_with_main_thread() { output1("START SINGLE THREAD"); let start_at = SystemTime::now(); let client = Client::new(); (0..1000) .collect::<Vec<i32>>() .iter() .for_each(|_| match send_request(&client) { Ok((thread_id, text)) => output2(thread_id, text.as_str()), Err(error) => output1(format!("{:?}", error).as_str()), }); let spent_time = start_at.elapsed().unwrap_or(Duration::new(0, 0)); output1(format!("END: {}", spent_time.as_millis()).as_str()) } fn send_request(client: &Client) -> ResultWithError<(ThreadId, String)> { let mut response = client .get(Url::parse("http://localhost:9000/timestamp").unwrap()) .send()?; Ok((thread::current().id(), response.text()?)) } fn output2(thread_id: ThreadId, text: &str) { println!("[{:?}] => {}", thread_id, text); } fn output1(text: &str) { output2(thread::current().id(), text); }
main()
関数以前のコードはエラーハンドリングに関するものなので、説明は割愛します。send_request()
関数ではローカルで別で立ち上がっているRestApiServerにリクエストを投げてシステム時間を返してくれます。実行結果
[ThreadId(1)] => START SINGLE THREAD [ThreadId(1)] => 2019-09-01T18:50:22.706987Z [ThreadId(1)] => 2019-09-01T18:50:22.710136Z [ThreadId(1)] => 2019-09-01T18:50:22.713299Z ...... [ThreadId(1)] => 2019-09-01T18:50:27.398560Z [ThreadId(1)] => 2019-09-01T18:50:27.404358Z [ThreadId(1)] => 2019-09-01T18:50:27.407224Z [ThreadId(1)] => END: 4711
当たり前ですが、リクエスト処理は全部メインスレッド ThreadId(1)
で順番に実行されてます。1000回リクエスは5秒近くかかりました。
非同期リクエストの実現
1000回リクエストを10スレッドに捌けて実行されるサンプルコードです。
use failure::Fail; use futures01::{future, Future}; use reqwest::r#async::Client as AsyncClient; use reqwest::Url; use std::thread; use std::thread::ThreadId; use std::time::{Duration, SystemTime}; use tokio_threadpool::{Builder, SpawnHandle}; fn main() { request_with_multi_thread(); } fn request_with_multi_thread() { output1("START MULTI THREAD"); let start_at = SystemTime::now(); let client = AsyncClient::new(); let thread_pool_size = 10; let thread_pool = Builder::new().pool_size(thread_pool_size).build(); let mut handles = Vec::<SpawnHandle<(ThreadId, String), ErrorWrapper>>::new(); while handles.iter().count() <= 1000 { let cloned_client = client.clone(); handles.push(thread_pool.spawn_handle(future::lazy(move || { send_request_for_future(&cloned_client) }))); } handles.iter_mut().for_each(|handle| match handle.wait() { Ok((thread_id, text)) => output2(thread_id, text.as_str()), Err(error) => output1(format!("{:?}", error).as_str()), }); thread_pool.shutdown_now(); let spent_time = start_at.elapsed().unwrap_or(Duration::new(0, 0)); output1(format!("END: {}", spent_time.as_millis()).as_str()) } fn send_request_for_future( client: &AsyncClient, ) -> impl Future<Item = (ThreadId, String), Error = ErrorWrapper> { client .get(Url::parse("http://localhost:9000/timestamp").unwrap()) .send() .and_then(|mut response| response.text()) .map(|text| (thread::current().id(), text)) .from_err() } fn output2(thread_id: ThreadId, text: &str) { println!("[{:?}] => {}", thread_id, text); } fn output1(text: &str) { output2(thread::current().id(), text); }
エラーハンドリングに関するコードは省略しました。
このサンプルでは
10スレッド
で 1000回リクエストを捌けようとしてます。タスク
Future(send_request_for_futureの戻り値)
をキューに溜まって、スレッドプールtokio_threadpool
にアイドルなスレッドがあれば、キューからタスクを取り出して実行する。
(行:22~28)その結果達
SpawnHandle
をメインスレッドでまとめる。
(行:30~33)。実行結果
[ThreadId(1)] => START MULTI THREAD [ThreadId(10)] => 2019-09-01T19:02:53.924756Z [ThreadId(6)] => 2019-09-01T19:02:53.930478Z [ThreadId(11)] => 2019-09-01T19:02:53.929056Z [ThreadId(11)] => 2019-09-01T19:02:53.934684Z [ThreadId(13)] => 2019-09-01T19:02:53.932217Z [ThreadId(7)] => 2019-09-01T19:02:53.918978Z [ThreadId(7)] => 2019-09-01T19:02:53.921761Z ...... [ThreadId(10)] => 2019-09-01T19:02:54.454774Z [ThreadId(14)] => 2019-09-01T19:02:54.456831Z [ThreadId(15)] => 2019-09-01T19:02:54.458395Z [ThreadId(15)] => 2019-09-01T19:02:54.459974Z [ThreadId(1)] => END: 593
リクエスト処理は全部メインスレッド以外のスレッドで行われて、かかった時間もほぼ直列の1/8です。
ハマったこと
1. HttpClientが違います。
reqwest::Client
をFutureの中で使うと下記のエラーが発生します。
[ThreadId(1)] => HttpRequestError { error: Error(BlockingClientInFutureContext, "http://localhost:9000/timestamp") }
Futureの中では reqwest::r#async::Client
を使わないといけないです。
2. reqwest::r#async::Client
をマルチスレッド間で共有しないと、大量リクエストが発生したらリソースが枯渇してしまいます。
let cloned_client = client.clone(); handles.push(thread_pool.spawn_handle(future::lazy(move || { send_request_for_future(&cloned_client) })));
この部分を下記のように変えたら
handles.push(thread_pool.spawn_handle(future::lazy(move || { send_request_for_future(&AsyncClient::new()) })));
下記のエラーが大量に出てきます。(リクエストが全部失敗ではないですが)
[ThreadId(1)] => HttpRequestError { error: Error(Hyper(Error(Io, Os { code: 54, kind: ConnectionReset, message: "Connection reset by peer" })), "http://localhost:9000/timestamp") }
リクエスト先のサーバからコネクションを勝手に切断したためエラーになったと思います。
reqwest::r#async::Client
のソースを見てみると、確かにClientには connection pool
を持っています。
並列で数多くClientを生成すると、余計にリクエスト先のサーバとコネクションを確立し過ぎで、途中で切断されることになります。
/// An asynchronous `Client` to make Requests with. /// /// The Client has various configuration values to tweak, but the defaults /// are set to what is usually the most commonly desired value. To configure a /// `Client`, use `Client::builder()`. /// /// The `Client` holds a connection pool internally, so it is advised that /// you create one and **reuse** it. #[derive(Clone)] pub struct Client { inner: Arc<ClientRef>, }
3. Future.wait()
を呼び出すときは、そのFutureは 別のスレッドで実行される保証 があるかどうかを考えないといけないです。
最初 send_request_for_future()
は下記のように実装していました。
fn send_request_for_future(client: &AsyncClient) -> ResultWithError<(ThreadId, String)> { Ok(( thread::current().id(), client .get(Url::parse("http://localhost:9000/timestamp").unwrap()) .send() .wait()? .text() .wait()?, )) }
reqwest::r#async::Client
は使っていますが、Futureを返せずに実際の処理結果を返そうとしていました。.send()
と .text()
はそれぞれFutureを返すので、実際の処理結果を取り出すために .wait()?
を2回呼び出しました。その結果、実行した直後に [ThreadId(1)] => START MULTI THREAD
だけが出力され、その後は一切処理が進まなかったです。
原因は send_request_for_future()
はthread_pool中のスレッドから実行されますが、そこで呼び出している2回の .wait()?
は更に他のスレッドに実行してくれないと、処理が進まないです。thread_pool中のスレッドが全部 send_request_for_future()
の実行に使ってしまったら、処理全体が進まなくなります。もしthread_pool_sizeがリクエスト回数より多い場合(この例だと let thread_pool_size = 1001;
)はすべてのリクエストが正常に終わります。
send_request_for_future()
がFutureを返すし、メインスレッドで SpawnHandle.wait()
を呼び出すように実装したら、処理全体が正常に終えました。
まとめ
ほぼどの技術のハマりも同じだと思います。本質を理解したら当たり前のように見えますが、理解する前には詳しい人に聞いたり、ググったりしてバタバタしても中々解決しないとイライラしてしまいますね。今回のハマリポイント、特に3点目は色々ググっても類似のサンプルソースがなかったので、困りました。もし同じ問題に困っている方がいらっしゃったら、この記事が参考にできるならば嬉しいです。
ここで使っているサンプルソースは下記のプロジェクトからcloneできます。
https://github.com/dimmy82/rust-concurrency-request
(おまけ)async / await
今年の7月ぐらい、新しい非同期シンタックス async / await
がリリースされて、ちょっと触ってみたので共有させていただきます。もし間違っているところがあればご指摘ください。
use failure::Fail; use futures::executor::block_on; use reqwest::{Client, Url}; use std::thread; use std::thread::ThreadId; use std::time::{Duration, SystemTime}; fn main() { block_on(request_with_async_await()); // this is not a parallel request example } async fn request_with_async_await() { output1("START ASYNC AWAIT"); let start_at = SystemTime::now(); let client = Client::new(); let future1 = send_request_async(&client); let future2 = send_request_async(&client); match future1.await { Ok((thread_id, text)) => output2(thread_id, text.as_str()), Err(error) => output1(format!("{:?}", error).as_str()), }; match future2.await { Ok((thread_id, text)) => output2(thread_id, text.as_str()), Err(error) => output1(format!("{:?}", error).as_str()), }; let spent_time = start_at.elapsed().unwrap_or(Duration::new(0, 0)); output1(format!("END: {}", spent_time.as_millis()).as_str()) } async fn send_request_async(client: &Client) -> ResultWithError<(ThreadId, String)> { let mut response = client .get(Url::parse("http://localhost:9000/timestamp").unwrap()) .send()?; Ok((thread::current().id(), response.text()?)) } fn output2(thread_id: ThreadId, text: &str) { println!("[{:?}] => {}", thread_id, text); } fn output1(text: &str) { output2(thread::current().id(), text); }
結論から言うと、上記サンプルのように
let future1 = /* async 関数を呼び出す */; let future2 = /* async 関数を呼び出す */; future1.await; future2.await;
という書き方は非同期にならず、同期処理になります。 async 関数は await で呼び出されたタイミングで実行される からです。
実行結果
[ThreadId(1)] => START ASYNC AWAIT [ThreadId(1)] => 2019-09-01T21:23:12.791994Z [ThreadId(1)] => 2019-09-01T21:23:12.794254Z [ThreadId(1)] => END: 8
async 関数は処理結果(今回の例だと ResultWithError<(ThreadId, String)>
)を返すように実装しますが、実際future1の型は impl core::future::Future<Output=ResultWithError<(ThreadId, String)>>
になります。なので、もし非同期処理にしたければ、future1, future2を thread_pool.spawn_handle
的な関数に渡して実行してもらうじゃないかなと思います。
ただ、ここの core::future::Future
は最近stdに追加されたFuture Traitですが、 futures01::Future
とは全然違うものです。なので、使っているthread_poolのバージョンはまだ対応していないようです。
今後 core::future::Future
に関する使い方が分かったらまた更新します。
以上、よろしくお願いいたします。