KotlinのCoroutineを用いた,外部API呼び出しの並列数を指定できるライブラリを作成した話
ユーザベースインターンの原田です.大学院で研究しながら京都でユーザベースのインターンをさせて頂いており,今回初めてブログを書かせて頂きます!
題名にある通り,今回KotlinのCoroutineを使用した並列数を指定して関数を実行できるライブラリ(ParallelExecutor)を作成しましたので,そのことについて投稿させて頂きます.
背景
外部のAPIを呼びだす処理を並列で呼びだしたいが,相手側の都合(サーバーへの負荷等)により並列数を制限したい状況が発生しました.しかしCoroutineは大量に起動出来てしまい,通常では並列数に制限をかけることが出来ません.そこでこれを実現する為に,ParallelExecutorを作成することにしました.
本記事の内容
本記事の内容は以下の通りです
そもそもCoroutineとは何か
Coroutine間で値を転送できるChannelについて
ParallelExecutorの説明
Coroutine
Coroutineは一言で言うと,軽量なスレッドです.そして以下のような特徴を持っています.
- 中断が可能な計算インスタンスである
- 特定のスレッドに束縛されない
ここではまずCoroutineの作成方法を示し,その後でこれらの特徴について説明します.
Coroutineの作成方法
下図はCoroutine作成のイメージです.
CoroutineはCoroutine builderで作成することができます.しかし,その際にはCoroutineScope内で作成する必要があります.CoroutineScopeとはCoroutineが実行される仮想的な場所のようなものです.CoroutineはCoroutineScope内でのみ実行可能です.
実際のコードを作成してみます.Coroutineを使用する為に以下の依存を追加して下さい.
dependencies {
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-RC"
}
以下はサンプルコードです.
fun main() { runBlocking { val job = launch { delay(1000L) // 1秒待つ println("World!") // print after delay } println("Hello,") delay(2000L) // プログラムを終了させない為にmain thread で2秒待つ } }
runBlockingは現在のスレッドをブロックしてCoroutineScopeを作り出します.そしてlaunchはCoroutine builderの1つであり,Coroutineを生成することができます.launchはデフォルトでは親のCoroutineScopeで実行するCoroutineを作成します.ここで親のrubBlockingのCoroutineScopeであるmain threadで実行するCoroutineを作成しています.引数でどのCoroutineScopeで実行するかを指定することもできます.
次にCoroutineの特徴について説明します.
中断が可能な計算インスタンス [1]
coroutineが中断可能な計算インスタンスであることについて説明します.ここで計算インスタンスとは処理を記述したコードブロックのことを指しています.よってスレッドは大量に起動できませんが,Coroutineは以下のように10000個など大量に起動しても問題ありません. またCoroutineが中断可能とは,Cortouineの処理を途中で止めて,スレッドを解放することができることを意味します.その中断はsuspend関数と呼ばれる関数で行われます.以下のコードを見てください.
suspend fun apiCall() { println("ApiCall") delay(1000) println("Return") } fun main(args: Array<String>) { runBlocking { println("main1") val job = launch { apiCall() // coroutineを中断し、スレッドを解放する } delay(500) //delay1 println("main2") } }
このdelayはsuspend関数です.呼ばれるとCoroutineを中断しスレッドを解放しする関数です.またsuspend関数はsuspend修飾子を使って自分で宣言すること可能です.ここではcallApiがそれに当たります.このコードは以下の図のように実行されます.
ポイントはsuspend関数を呼びsuspend関数であるcallApiのdelayが呼ばれた後にスレッドを解放している点です.これがCoroutineの特徴でスレッドをブロックすることなく処理を実行できます.jobは処理の集合を表すインスタンスです.
このコードの実行結果は以下のようになります.
main1 ApiCall main2 Return
特定のスレッドに束縛されない
Coroutineは特定のスレッドに束縛されません.つまりCroutineとスレッドは1対1対応ではありません.Croutineはsuspend関数によって中断しスレッドを解放,そしてそのとき空いているスレッドを確保し再開されながら実行を行います.
こうすることでより1つのスレッドを有効に活用することが可能です.
Channelとは
Channelとはキューの一種です.Channelを用いることがCoroutine間で値を転送することが出来ます.
channelのsendを呼ぶことで,値をchannelに書き込みchannelの片方でrecieveを呼ぶことでその値を順に呼び出すことができます.実際のコードは以下の通りです.
fun main() { runBlocking { val channel = Channel<Int>() launch { for (x in 1..5){ channel.send(x * x) //値をchannelに書き込む } } repeat(5) { println(channel.receive()) } //値を取り出す println("Done!") } }
このコードは以下のように書くこともできます.closeは特別な関数でchannelの終了を表すtokenを送ることができます.読み取り側でこのtokenが読み取られると繰り返しが終了し,全ての要素が読み取られたことを保証できます.
fun main() { runBlocking { val channel = Channel<Int>() launch { for (x in 1..5) { channel.send(x * x) } channel.close() } for (item in channel) { println(item) } println("Done!") } }
ParallelExecutorについて
ここで今回作成したParallelExecutorについて説明します.使用は以下の通りです.
並列数を指定してsuspend関数を並列に実行することができる
ParallelExecutorのインスタンスを共有することで,共有した部分で並列数を制御することができる
ParallelExecutorには入力として引数にシーケンスと実行したいsuspend関数を渡すことができる
全てのシーケンスの要素は,ParallelExecutorに渡した関数に渡され実行される
結果はChannelにResult型で書き込まれ,ParallelExecutorはそのChannelを返す
途中で例外が発生すると自動的にchannelは閉じられ,channel最後の要素がその例外を持っている
実際のコードはこちらです. https://github.com/uzabase/ParallelExecutor/blob/master/src/main/kotlin/ParallelExecutor.kt
ParallelExecutorではCoroutineの並列数を指定する為にChannelをセマフォとして用いています.[2]セマフォとは共有資源に対するアクセス可能な数を示すものです.
ParallelExecutorではセマフォに値を送れたCoroutineのみが処理を可能にしています.Channleのsendはsuspend関数なのでCroutineの処置を中断ができます.従ってCoroutineの並列数を制限することができます.
大きな流れを説明します.① でまずCoroutineがinputSeqの大きさ分起動します.その次に②でsemaphoreに値を送ろうとします.③semaphoreに値を送れたCoroutineは処理の開始を行い,④実際の処理が走ります.
ParallelExecutorのが行なっていることのイメージが以下の図です.
④の中身を説明します.
まずGlobalScopeでCroutineをinputSeq(入力として与えたシーケンス)の数起動をさせます.GlobalScopeはデフォルトでは用意されているBackground Thread Poolのスレッドを使用してCoroutineScopeを作成します.コードでは以下の部分です.
job = GlobalScope.launch(handler) {
inputSeq.forEach { input ->
launch {
・
・
そして次に自分で用意したSenderThreadPoolをCoroutineScopeとして指定して,callFunction(input)を呼ぶCoroutineを作成します.コードでは以下の部分です. SenderThreadFactoryの定義
class SenderThreadFactory : ThreadFactory { private var count = 0 override fun newThread(r: Runnable): Thread { return Thread(r, "sender-thread-" + ++count) } }
SenderThreadFactoryによって作成されたPoolを用いてDispatcherを生成(これを渡すことでCoroutineのCoroutineScopeをこのPoolに指定できる)
private val dispatcher = Executors.newFixedThreadPool(capacity, SenderThreadFactory()).asCoroutineDispatcher()
Coroutineを起動する(withContextは値を返すCoroutineBuilderの一つ)
withContext(dispatcher) { runCatching { callFunction(input) } }
そしてその結果をResultに格納し,resultChに送り,semaphoreの値を1つ取り出します.そうすることで待機しているCoroutineが動き出します.コードでは以下の部分です.
}.let { result -> resultCh.send(result) semaphore.receive() result.onFailure { throw it }
またcallFunctionで例外が発生した場合は,例外ハンドラに処理が行き,jobのキャンセルが行われ残りの処理が素通りされるようになっています.そしてresultChとsemaphoreを閉じます.以下が例外ハンドラのコードです.
val handler = CoroutineExceptionHandler { _, exception -> exception.printStackTrace() job?.cancel() resultCh.close(exception) channel.close() }
jobのキャンセルは以下のように実装されています.jobのキャンセルが呼ばれるとisActiveがfalseになります.よって素通りしたい処理をif分で囲っています.
launch { if (isActive) { semaphore.send(Unit) withContext(dispatcher) { ・ ・
まとめ
今回作成したライブラリとCoroutine周りの説明をさせて頂きました. 本ライブラリの実際の使用方法はこちらをご参照下さい. https://github.com/uzabase/ParallelExecutor
参考文献 [1] https://qiita.com/k-kagurazaka@github/items/8595ca60a5c8d31bbe37 [2] https://qiita.com/k-kagurazaka@github/items/0c30cc04dcef306ed3c7