UZABASE Tech Blog

〜迷ったら挑戦する道を選ぶ〜 株式会社ユーザベースの技術チームブログです。

Gaugeのsetupとteardownステップを用いて効率的に読みやすいテストを書く

こんにちは!SPEEDA開発チームの工藤です。

大分時間が開いてしまいましたが、Gaugeシリーズの第四回目です。

今回はe2eテスト書く際には必須であろうSet Up/Tear Down Stepsを、Gaugeではどのように実現できるのかをSPEEDA開発チームでの実例も交えてお伝えできればと思います。

過去3回分の記事はこちらから↓

  1. Gauge Test Automation Toolとアジャイル開発
  2. GaugeのConceptを用いてテストシナリオをより仕様書のように記述する
  3. GaugeのParameterを使いこなす

GaugeにおけるSet Up/Tear Down Stepsの実現方法

Gaugeには、Set Up StepsやTear Down Stepsを実現できる手段がいくつか用意されています。

用途に合わせて下記のいずれかを選択して使います。

  • Contexts
  • Tear Down Steps
  • Execution Hooks

上記3つについて順番にお伝えしていきます。

Contexts

GaugeではSet Up StepsをContextsと呼んでいます。 context stepsを使用することで、SpecファイルのScenarioの実行に必要な条件を指定できます。

Specファイル内先頭のScenarioの前にStepを記述すると、そのStepが全てのScenarioの最初に実行されます。(GaugeにおけるSpecやScenarioなどのワードに馴染みのない方は第一回目の記事をご覧ください)

下記例ではScenario 1の前に「ユーザーAでログイン」、「プロジェクトページに遷移する」というStepを記述していて、このStepがScenario 1、2の最初に実行されます。

# プロジェクトの削除

context steps
* ユーザーAでログイン
* プロジェクトページに遷移する

Scenario 1
## 1つのプロジェクトを削除
* プロジェクト"project_a"を削除
* プロジェクト"project_a"が削除されていることを確認

Scenario 2
## 複数のプロジェクトを削除
* プロジェクトリスト上の全てのプロジェクトを削除
* プロジェクトリストが空であることを確認

各Scenarioで必要だがあまり仕様的には重要ではないセットアップ処理などをContextsにまとめることで、Specificationの冗長さをなくすことができます。

SPEEDA開発ではSpecファイルをページ単位で切ることが多いのでテスト対象のページに遷移するステップや、ログイン処理をここに書くことが多いです。

Tear Down Steps

Tear Down StepsはSpecファイル内最後のScenarioの後に記載します。 Scenarioの実行を終えるために必要なStepがあればTear Down Stepsとして定義します。

3つ以上のアンダースコアを記述することでTear Down Stepsを指定することができます。

___
* Tear down step 1
* Tear down step 2
* Tear down step 3

下記の例では、アンダースコアの後に記述されている「ユーザーAでログアウト」と「ユーザーAを削除」がTear Down Stepsになります。このSpecificationが実行されると、下記の順で実行されます。

  1. Contextsの実行
  2. 1つのプロジェクトを削除のScenarioの実行
  3. Tear Down Stepsの実行
  4. Contextsの実行
  5. 複数のプロジェクトを削除のScenarioの実行
  6. Tear Down Stepsの実行
# プロジェクトの削除

* ユーザーAを作成
* ユーザーAでログイン

Scenario 1
## 1つのプロジェクトを削除
* プロジェクト"project_a"を削除
* プロジェクト"project_a"が削除されていることを確認

Scenario 2
## 複数のプロジェクトを削除
* プロジェクトリスト上の全てのプロジェクトを削除
* プロジェクトリストが空であることを確認
___
ここからTear Down Steps
* ユーザーAでログアウト
* ユーザーAを削除

SPEEDA開発では実はあまりTear Down Stepsは使っていません、Tear Downとしては後述するExecution Hooksを使う方が多いです。

Execution hooks

Execution hooksを使うとSuite,Spec,Scenario,Stepの単位で任意のテストコードを実行することができます。

ContextsやTear DownはSpecファイル毎且つScenarioにしか定義できませんが、Execution HooksはSpecファイルを跨いで且つ様々な単位で定義できます。

import com.thoughtworks.gauge.*

class ExecutionHooksExample {
    @BeforeSuite
    fun beforeSuite() {
        // 全てのテスト実施前の最初に一度だけ実行される処理
    }

    @AfterSuite
    fun afterSuite() {
        // 全てのテスト実施後の最後に一度だけ実行される処理
    }

    @BeforeSpec
    fun beforeSpec() {
        // 各Specファイルのテスト実施の先頭に一度だけ実行される処理
    }

    @AfterSpec
    fun afterSpec() {
        // 各Specファイルのテスト実施の最後に一度だけ実行される処理
    }

    @BeforeScenario
    fun beforeScenario() {
        // 各Scenario実施前に実行される処理
    }

    @AfterScenario
    fun afterScenario() {
        // 各Scenario実施後に実行される処理
    }

    @BeforeStep
    fun beforeStep() {
        // 各Step実施前に実行される処理
    }

    @AfterStep
    fun afterStep() {
        // 各Scenario実施後に実施される処理
    }
}

SPEEDAでは下記のような処理はBefore Suiteで実行しています

  • 一度だけ設定ファイルを読み込む
  • Read-Onlyデータの投入

また下記のような処理はAfter Scenarioで実行しています

  • ログアウト
  • WebdriverのClose処理

その他にもDBやモックのセットアップ処理もExecution Hooksを使用して任意のタイミングで実行しています。

Execution hooksを特定のTagが指定されている場合のみ実行されるようにすることも可能です。その場合は下記のように指定します。

// tag1 または tag2がついているScenarioでのみ前処理として下記を実行
@BeforeScenario(tags = {"tag1, tag2"})
fun setupDataBase() {
    // Code for before scenario
}

まとめ

GaugeはExecutable Specificationを謳っていてSpecファイルやScenarioは実行可能な"仕様書"である必要があります。

SPEEDA開発では今回ご紹介した機能を使ってSpecificationファイルの記述を出来るだけ簡潔にして、より仕様書として読みやすくするよう心がけています。

KotlinのCoroutineを用いた,外部API呼び出しの並列数を指定できるライブラリを作成した話

KotlinのCoroutineを用いた,外部API呼び出しの並列数を指定できるライブラリを作成した話

ユーザベースインターンの原田です.大学院で研究しながら京都でユーザベースのインターンをさせて頂いており,今回初めてブログを書かせて頂きます!

題名にある通り,今回KotlinのCoroutineを使用した並列数を指定して関数を実行できるライブラリ(ParallelExecutor)を作成しましたので,そのことについて投稿させて頂きます.

背景

外部のAPIを呼びだす処理を並列で呼びだしたいが,相手側の都合(サーバーへの負荷等)により並列数を制限したい状況が発生しました.しかしCoroutineは大量に起動出来てしまい,通常では並列数に制限をかけることが出来ません.そこでこれを実現する為に,ParallelExecutorを作成することにしました.

本記事の内容

本記事の内容は以下の通りです

  • そもそもCoroutineとは何か

  • Coroutine間で値を転送できるChannelについて

  • ParallelExecutorの説明

    Coroutine

    Coroutineは一言で言うと,軽量なスレッドです.そして以下のような特徴を持っています.

  • 中断が可能な計算インスタンスである
  • 特定のスレッドに束縛されない

ここではまずCoroutineの作成方法を示し,その後でこれらの特徴について説明します.

Coroutineの作成方法

下図はCoroutine作成のイメージです. f:id:harada-777:20191015180809p:plain:w400:left
CoroutineはCoroutine builderで作成することができます.しかし,その際にはCoroutineScope内で作成する必要があります.CoroutineScopeとはCoroutineが実行される仮想的な場所のようなものです.CoroutineはCoroutineScope内でのみ実行可能です. 実際のコードを作成してみます.Coroutineを使用する為に以下の依存を追加して下さい.

dependencies {
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-RC"
}

以下はサンプルコードです.

fun main() {
    runBlocking {
        val job = launch {
            delay(1000L) // 1秒待つ
            println("World!") // print after delay
        }
        println("Hello,")
        delay(2000L) // プログラムを終了させない為にmain thread で2秒待つ
    }
}

runBlockingは現在のスレッドをブロックしてCoroutineScopeを作り出します.そしてlaunchはCoroutine builderの1つであり,Coroutineを生成することができます.launchはデフォルトでは親のCoroutineScopeで実行するCoroutineを作成します.ここで親のrubBlockingのCoroutineScopeであるmain threadで実行するCoroutineを作成しています.引数でどのCoroutineScopeで実行するかを指定することもできます.

次にCoroutineの特徴について説明します.

中断が可能な計算インスタンス [1]

coroutineが中断可能な計算インスタンスであることについて説明します.ここで計算インスタンスとは処理を記述したコードブロックのことを指しています.よってスレッドは大量に起動できませんが,Coroutineは以下のように10000個など大量に起動しても問題ありません. またCoroutineが中断可能とは,Cortouineの処理を途中で止めて,スレッドを解放することができることを意味します.その中断はsuspend関数と呼ばれる関数で行われます.以下のコードを見てください.

suspend fun apiCall() {
    println("ApiCall")
    delay(1000)
    println("Return")
}
fun main(args: Array<String>) {
    runBlocking {
        println("main1")
        val job = launch {
            apiCall() // coroutineを中断し、スレッドを解放する
        }
        delay(500) //delay1
        println("main2")
    }
}

このdelayはsuspend関数です.呼ばれるとCoroutineを中断しスレッドを解放しする関数です.またsuspend関数はsuspend修飾子を使って自分で宣言すること可能です.ここではcallApiがそれに当たります.このコードは以下の図のように実行されます. f:id:harada-777:20191015180549p:plain
ポイントはsuspend関数を呼びsuspend関数であるcallApiのdelayが呼ばれた後にスレッドを解放している点です.これがCoroutineの特徴でスレッドをブロックすることなく処理を実行できます.jobは処理の集合を表すインスタンスです. このコードの実行結果は以下のようになります.

main1
ApiCall
main2
Return

特定のスレッドに束縛されない

Coroutineは特定のスレッドに束縛されません.つまりCroutineとスレッドは1対1対応ではありません.Croutineはsuspend関数によって中断しスレッドを解放,そしてそのとき空いているスレッドを確保し再開されながら実行を行います.

f:id:harada-777:20191015182617p:plain:w400:left
こうすることでより1つのスレッドを有効に活用することが可能です.

Channelとは

Channelとはキューの一種です.Channelを用いることがCoroutine間で値を転送することが出来ます. f:id:harada-777:20191015181603p:plain:w400:left
channelのsendを呼ぶことで,値をchannelに書き込みchannelの片方でrecieveを呼ぶことでその値を順に呼び出すことができます.実際のコードは以下の通りです.

fun main() {
    runBlocking {
        val channel = Channel<Int>()
        launch {
            for (x in 1..5){
                channel.send(x * x) //値をchannelに書き込む
            }
        }
        repeat(5) { println(channel.receive()) } //値を取り出す
        println("Done!")
    }
}

このコードは以下のように書くこともできます.closeは特別な関数でchannelの終了を表すtokenを送ることができます.読み取り側でこのtokenが読み取られると繰り返しが終了し,全ての要素が読み取られたことを保証できます.

fun main() {
    runBlocking {
        val channel = Channel<Int>()
        launch {
            for (x in 1..5) {
                channel.send(x * x)
            }
            channel.close()
        }
        for (item in channel) {
            println(item)
        }
        println("Done!")
    }
}

ParallelExecutorについて

ここで今回作成したParallelExecutorについて説明します.使用は以下の通りです.

  • 並列数を指定してsuspend関数を並列に実行することができる

  • ParallelExecutorのインスタンスを共有することで,共有した部分で並列数を制御することができる

  • ParallelExecutorには入力として引数にシーケンスと実行したいsuspend関数を渡すことができる

  • 全てのシーケンスの要素は,ParallelExecutorに渡した関数に渡され実行される

  • 結果はChannelにResult型で書き込まれ,ParallelExecutorはそのChannelを返す

  • 途中で例外が発生すると自動的にchannelは閉じられ,channel最後の要素がその例外を持っている

実際のコードはこちらです. https://github.com/uzabase/ParallelExecutor/blob/master/src/main/kotlin/ParallelExecutor.kt

ParallelExecutorではCoroutineの並列数を指定する為にChannelをセマフォとして用いています.[2]セマフォとは共有資源に対するアクセス可能な数を示すものです. f:id:harada-777:20191015181641p:plain:w400:left
ParallelExecutorではセマフォに値を送れたCoroutineのみが処理を可能にしています.Channleのsendはsuspend関数なのでCroutineの処置を中断ができます.従ってCoroutineの並列数を制限することができます. 大きな流れを説明します.① でまずCoroutineがinputSeqの大きさ分起動します.その次に②でsemaphoreに値を送ろうとします.③semaphoreに値を送れたCoroutineは処理の開始を行い,④実際の処理が走ります. ParallelExecutorのが行なっていることのイメージが以下の図です. f:id:harada-777:20191015181807p:plain ④の中身を説明します. まずGlobalScopeでCroutineをinputSeq(入力として与えたシーケンス)の数起動をさせます.GlobalScopeはデフォルトでは用意されているBackground Thread Poolのスレッドを使用してCoroutineScopeを作成します.コードでは以下の部分です.

job = GlobalScope.launch(handler) {
            inputSeq.forEach { input ->
                launch { 
                            ・
                            ・

そして次に自分で用意したSenderThreadPoolをCoroutineScopeとして指定して,callFunction(input)を呼ぶCoroutineを作成します.コードでは以下の部分です. SenderThreadFactoryの定義

class SenderThreadFactory : ThreadFactory {
    private var count = 0
    override fun newThread(r: Runnable): Thread {
        return Thread(r, "sender-thread-" + ++count)
    }
}

SenderThreadFactoryによって作成されたPoolを用いてDispatcherを生成(これを渡すことでCoroutineのCoroutineScopeをこのPoolに指定できる)

private val dispatcher = Executors.newFixedThreadPool(capacity, SenderThreadFactory()).asCoroutineDispatcher()

Coroutineを起動する(withContextは値を返すCoroutineBuilderの一つ)

withContext(dispatcher) {
                        runCatching {
                            callFunction(input)
                        }
                    }

そしてその結果をResultに格納し,resultChに送り,semaphoreの値を1つ取り出します.そうすることで待機しているCoroutineが動き出します.コードでは以下の部分です.

 }.let { result ->
    resultCh.send(result)
    semaphore.receive()
    result.onFailure {
    throw it
}

またcallFunctionで例外が発生した場合は,例外ハンドラに処理が行き,jobのキャンセルが行われ残りの処理が素通りされるようになっています.そしてresultChとsemaphoreを閉じます.以下が例外ハンドラのコードです.

val handler = CoroutineExceptionHandler { _, exception ->
    exception.printStackTrace()
    job?.cancel()
    resultCh.close(exception)
    channel.close()
   }

jobのキャンセルは以下のように実装されています.jobのキャンセルが呼ばれるとisActiveがfalseになります.よって素通りしたい処理をif分で囲っています.

launch {
    if (isActive) {
        semaphore.send(Unit)
        withContext(dispatcher) {
                ・
                ・

まとめ

今回作成したライブラリとCoroutine周りの説明をさせて頂きました. 本ライブラリの実際の使用方法はこちらをご参照下さい. https://github.com/uzabase/ParallelExecutor

参考文献 [1] https://qiita.com/k-kagurazaka@github/items/8595ca60a5c8d31bbe37 [2] https://qiita.com/k-kagurazaka@github/items/0c30cc04dcef306ed3c7

ReactとReactHooksを使って、Flux的なアーキテクチャを実現する

こんにちは。SPEEDA開発チームの冨田です。

昨今のフロントエンドでは、Fluxというアーキテクチャが利用されることが多くなってきています。SPEEDAでもVueを使っている画面がありますが、そこではVuexというVue向けのFluxライブラリで状態管理をしています。

Fluxではデータの流れを一方向にすることで見通しのよい設計が行えるようになります。

今回は、素のReactを使ってデータの流れを単一方向にする設計を紹介します。

今回作ってみるもの

Todoアプリを作ってみましょう。 以下のようなことができる画面を作ります。なお、今回はデータの永続化は考えないものとします(つまり、ウェブページを更新すると、全て消えてしまいます)。

  • Todoを追加することができる
  • 全てのTodoのリストを見ることができる
  • Todoの完了状態を変更することができる

セットアップ

簡単のため、CodeSandboxなどで、React + TypeScriptなプロジェクトを作っておくとよいでしょう。以下は、今回作成したプロジェクトになります。

codesandbox.io

1. GettersとActionsをつくる

Todoアプリを実現するためのカスタムフックを作りましょう。

Gettersはコンポーネントからアクセスできるようにする(露出する)データの集まりです。Actionsはそれを呼び出すことで、なんらかの副作用を起こし、Gettersを更新する関数の集まりです(正確には、内部のStateを変更することで、Gettersが更新される)。これらはFluxで定義されているものと同様の概念だと考えていただいてよいでしょう。

今回はGetters, Actionsとして以下のものを定義しています。

  • todos: Todoのリストを保持する配列
  • add: 文字列を受け取り、Todoのリストに新規のTodoを追加する
  • update: idと真偽値を受け取り、リストの中の該当するTodoの完了状態を変更する
import { useState } from "react";

interface Todo {
  description: string;
  done: boolean;
}

interface State {
  todos: Todo[];
}

interface Getters {
  todos: Array<Todo & { id: number }>;
}

interface Actions {
  add(description: string): void;
  update(id: number, done: boolean): void;
}

export type Store = Getters & Actions;

export function useTodo(): Getters & Actions {
  const [state, setState] = useState<State>({ todos: [] });
  const { todos } = state;

  function add(description: string) {
    setState({
      todos: [...todos, { description, done: false }]
    });
  }

  function update(id: number, done: boolean) {
    setState({
      todos: todos.map((it, i) => (i !== id ? it : { ...it, done }))
    });
  }

  return {
    todos: todos.map((it, id) => ({ ...it, id })),
    add,
    update
  };
}

2. GettersとActionsをどこのコンポーネントからでもアクセスできるようにする

useContextを使います。 useContextは、createContextにより、あらかじめ生成しておいたコンテキストにどこからでもアクセスできるようにするReactHooksです。

例:Appコンポーネント内で、Context.Providervalue propsに渡したオブジェクトがChildElementコンポーネント内のuseContextで得られます。

const Context = createContext({});

const ChildElement = () => {
  const { foo } = useContext(Context);
  return (
    <div>{foo}</div>
  );
}

const App = () => {
  return (
    <Context.Provider value={{ foo: "bar" }}>
      <ChildElement />
    </Context.Provider>
  );
};

上記の仕組みを利用して、GettersとActionsに対してどこからでもアクセスできるようにし、さらにGettersに変更が行われると、再レンダーされるようにします。

createContextでコンテキストを作成します。ここでは引数に{} as anyとやってしまっていますが、Providerでvalueを提供しないときに初期値として得られるものなので、これでよいでしょう。

StoreProviderコンポーネントは、Context.Providerをラップしたものです。コンテキストにuseTodoで生成したGettersとActionsを保存し、子コンポーネントをレンダリングします。 ユーザーに、Contextやカスタムフックを意識させないための工夫です。

useContextもReactのuseContextをラップしたものです。こちらも、Contextを意識させないために作っています。

ここで定義したStoreProviderとuseContextを利用することで、画面のコンポーネントではカスタムフックについて意識することなく、useContextを呼ぶだけでなんか知らんけどGettersとActionsが使える、という状態になります。

import React, { createContext, useContext as useContextOriginal, FC } from "react";
import { useTodo, Store } from "./Store";

interface AppStore {
  todo: Store;
}

const Context = createContext<AppStore>({} as any);
const { Provider } = Context;

export const StoreProvider: FC = ({ children }) => {
  const todo = useTodo();
  return <Provider value={{ todo }}>{children}</Provider>;
};

export function useContext() {
  return useContextOriginal(Context);
}

3. Todoを表示するコンポーネントを作る

ユーザーが入力しているdescriptionについてはこのコンポーネントで管理するようにしています。これも上で作ったGetters, Actionsで管理してもよいのですが、useStateとの共存も示したかったので、今回はここに書きました。

実装自体は、普通のulとinput, buttonを使ったコンポーネントですが、useContextを利用することで、todos, add, updateにアクセスできるようになりました。

追加ボタンが押され、addが呼び出されることで、todosが更新され、再レンダリングが行われます。

チェックボックスが変更されることで、updateが呼び出され、todosが更新され、再レンダリングが行われます。

Getters(todos)は、画面をレンダリングするために必要なもの。Actions(add, update)は呼び出すことで、状態を変更し、再レンダリングを促すもの。データの流れが単一方向になっているのがわかるでしょうか。

import React, { FC, useState } from "react";
import { useContext } from "./Flux";

export const TodoComponent: FC = () => {
  const [currentDescription, setCurrentDescription] = useState("");
  const { todo: { todos, add, update } } = useContext();

  function onAddClick() {
    setCurrentDescription("");
    add(currentDescription);
  }

  return (
    <>
      <ul>
        {todos.map(({ id, description, done }) => (
          <li key={id}>
            {description}
            <input
              type="checkbox"
              checked={done}
              onChange={e => update(id, e.target.checked)}
            />
          </li>
        ))}
      </ul>
      <input
        value={currentDescription}
        onChange={e => setCurrentDescription(e.target.value)}
      />
      <button onClick={onAddClick}>追加</button>
    </>
  );
};

4. アプリコンポーネントを作る

最後に、アプリコンポーネント(一番外側のコンポーネント)を作ります。

2で作ったStoreProviderがここで登場します。 3で作ったTodoComponentをStoreProviderで包むだけです。 これにより、内部で状態が変化したときに、再レンダリングが走ります。

import React from "react";
import { render } from "react-dom";
import { TodoComponent } from "./TodoComponent";
import { StoreProvider } from "./Flux";

const App = () => {
  return (
    <StoreProvider>
      <TodoComponent />
    </StoreProvider>
  );
};

render(<App />, document.getElementById("root"));

5. テストを書く

順番が前後して申し訳ないのですが、最後におまけとして、Getters, Actionsのテストの書き方をご紹介します。

ReactのカスタムフックはReactコンポーネント内でしか呼び出すことができません。 すなわち、テスト用のReactコンポーネント内でカスタムフックを呼び出し、呼び出した結果をテストすることになるでしょう。

react-domが提供するactという関数のコールバックでレンダリングやカスタムフックが生成したGetters, Actionsを取得することができます。

今回はbeforeEachで、テスト用のコンポーネントをマウントしさらにuseTodoが返すGetters, Actionsを変数に保持しています。Specification内では得られたGetterやActionsのテストのみに注力しています。

注意点としてはact内でActionsを呼び出せるのは1回で、何度も呼び出したい場合はactを何度も書く必要があることです。actを抜けると、状態がGetterに反映されるイメージです。

テストケースは3つです。 ひとつめはtodosの初期状態が空配列であること。 ふたつめはaddすると、todosに要素がひとつ追加されること。 みっつめは、addした要素に対しupdateをかけると、その要素のdoneの状態が、指定したとおりに変更されること。

import React from "react";
import { render } from "react-dom";
import { act } from "react-dom/test-utils";
import { useTodo, Store } from "../Store";

describe("#useTodo", () => {
  let container;
  describe("useTodo called.", () => {
    let state: Store;
    beforeEach(() => {
      container = document.createElement("div");
      const App = () => ((state = useTodo()), null);
      act(() => {
        render(<App />, container);
      });
    });
    afterEach(() => (container = null));

    it("should have action and state.", () => {
      expect(state.todos).toEqual([]);
    });

    it("should add todos when 'add' called.", () => {
      act(() => state.add("foo"));
      expect(state.todos).toEqual([{ description: "foo", done: false, id: 0 }]);
    });

    it("should update done status when 'update' called.", () => {
      act(() => state.add("foo"));
      act(() => state.add("bar"));
      act(() => state.update(0, true));
      expect(state.todos).toEqual([
        { description: "foo", done: true, id: 0 },
        { description: "bar", done: false, id: 1 }
      ]);
    });
  });
});

以上のように、素のReactだけでFlux的なアーキテクチャを実現することができました。 よろしければ、ぜひお試しください。

pytest-mock使ってハマったこと

こんにちは。

SPEEDA開発チームの掛川です。

現在、私が参画しているプロジェクトではPythonを使ってサービスの開発を行なっています。 私自身、Pythonを書くのは今回が初めてなのですが、 テストを書く際にハマったことについて記事にしていきたいと思います。

環境といろいろ

・環境
    OS     Mac OS X  10.14.5
    python 3.7.3
・ライブラリ
    pytest 5.1.2
    pytest-mock 1.11.1


私が今まで参画していたプロジェクトでは、モックライブラリはMockKを使用していた(使用言語はKotlinでした)のですが 「MockitoやMockKと同じように表現したい時にpytestではどうやって書いたらいいの?」 と疑問に思いハマったことの中から今回は以下の2つの内容にフォーカスしていきます。

  • classメソッドをモックしたい時
  • 引数に応じて返す値を変えたい時


classメソッドをモックしたい時

今回は請求書を発行するシステムを想定しました。

f:id:kkyki:20191011013258p:plain
請求書のイメージ

まずは、請求書の発行を行うためのusecaseとusecaseのテストを書いてみました。
このusecaseで行いたいことは4つです。

  1. 渡されたuser_idをもとに顧客情報(customer)を取得してくる
  2. 渡されたuser_idをもとに購入情報(Details)を取得してくる
  3. 購入情報(Details)から購入の合計金額を算出する
  4. 請求書(Invoice)の発行に必要な情報をControllerに返す
class InvoiceUseCase:

    @classmethod
    def publish(cls, user_id):
        # 1.顧客情報(customer)の取得
        customer = CustomerGateway.get(user_id)
        # 2.購入情報(Details)を取得
        details = SalesHistoriesGateway.get(user_id)
        # 4. 請求書情報(Invoice)を返却
        return Invoice(
            billing_date=datetime.date(2019, 1, 1),
            customer=customer,
            details=details,
            total=details.total())  # 3. 合計金額を算出
class TestInvoiceSheetUseCase:

    def test_publish(self, mocker):
        # Customerをモックする
        customer = mocker.Mock(Customer)
        customer_gateway_mock = mocker.patch.object(
            CustomerGateway,
            'get',
            return_value=customer)

        # Detailをモックする
        detail1 = mocker.Mock(Detail)
        detail2 = mocker.Mock(Detail)
        details = Details(details=[detail1, detail2])
        sales_histories_gateway_mock = mocker.patch.object(
            SalesHistoriesGateway,
            'get',
            return_value=details)

        expected = Invoice(
            billing_date=datetime.date(2019, 1, 1),
            customer=customer,
            details=details,
            total=details.total())

        assert InvoiceUseCase.publish('user_id') == expected

        # CustomerGatewayのgetメソッドが指定した引数'user_id'で1回呼ばれたことの検証
        customer_gateway_mock.assert_called_once_with('user_id')
        # SalesHistoriesGatewayのgetメソッドが指定した引数'user_id'で1回呼ばれたことの検証
        sales_histories_gateway_mock.assert_called_once_with('user_id')

MockitoやMockKでclassメソッドをモックしたい時には以下のように
when(モックしたいクラスオブジェクト.モックしたいメソッド).thenReturn(返したい値);
every { モックしたいクラスオブジェクト.モックしたいメソッド } return 返した値
と書くことができると思いますが、 pytest-mockでは
mocker.patch.object(モックしたいクラスオブジェクト, 'モックしたいメソッド', return_value=返したい値)
または
mocker.patch('モックしたいクラスオブジェクト.モックしたいメソッド', return_value=返したい値)
と書くことができます。

引数に応じて返す値を変えたい時

上記で書いていたものは全てreturn_value を使っており、 モックしたいメソッドは毎回固定の値を返すように設定していました。
ですが、メソッドに渡された引数に応じて返す値を変えたい時はないでしょうか?
そういう場合は、side_effect を使います。
side_effect を使うと、モックしたいメソッドの代わりに side_effect で定義したメソッドが呼び出され(引数はモックしたいメソッドと合わせる)、そのメソッドが返す値がモックの返す値として使われます。


次に、顧客情報を扱うgatewayとgatewayのテストを書いてみました。
このgatewayで行いたいことは以下です。

  • 複数のuser_idを受け取り、それに対するそれぞれの顧客情報の集合体(Customers)を返す
class CustomerGateway:

    @classmethod
    def get_customers(cls, user_ids):
        return Customers(list(map(
            lambda user_id: CustomerDriver.find_by(user_id), user_ids)))
    def test_get_customers(self, mocker):
        def find_by_id(user_id):
            if user_id == 'user_id1':
                return Customer(
                    id='user_id1',
                    name='Alice',
                    zip_code=77777,
                    address='Anaheim, CA',
                    number=123456789)
            elif user_id == 'user_id2':
                return Customer(
                    id='user_id2',
                    name='Bill',
                    zip_code=88888,
                    address='New York, NY',
                    number=111111111)
            else:
                self.fail('invalid user_id!!')

        mocker.patch.object(
            CustomerDriver,
            'find_by',
            side_effect=find_by_id)

        assert CustomerGateway.get_customers(['user_id1', 'user_id2']) == Customers([
            Customer(
                id='user_id1',
                name='Alice',
                zip_code=77777,
                address='Anaheim, CA',
                number=123456789),
            Customer(
                id='user_id2',
                name='Bill',
                zip_code=88888,
                address='New York, NY',
                number=111111111)
        ])


mockerについて

mockerはPythonの標準ライブラリであるmockライブラリの薄いラッパーで、unittestで提供しているモックパッケージと同じ引数をサポートしており、 テストメソッドに引数として渡すことで使うことができます。
また、mockerの便利なところはpatchしたクラスメソッドを初期化する必要がないところです。 mockerでモックしたインスタンスは実行対象のテストメソッドの実行後に自動的にリセットされます。

mocker.patch.object()mocker.patch() はどちらを使ってもモックすることは可能ですが、両者の違いは
mocker.patch.object() は外部から注入されたインスタンスに対してモックしますが、 mocker.patch() では渡された文字列から内部で対象のクラスとメソッドの参照を取得しモックします。


上で書いたgatewayに、指定されたuser_idを受け取り対応する顧客情報(Customer)を返すメソッドを追加して
mocker.patch.object()mocker.patch() のそれぞれを使ってテストを書いてみました。

class CustomerGateway:

    @classmethod
    def get(cls, user_id):
        return CustomerDriver.find_by(user_id)
<mocker.patch.object()を使った場合>
class TestCustomerGateway:

    def test_get_customer(self, mocker):
        customer = Customer(
            id='id001',
            name='Alice',
            zip_code=77777,
            address='SEATTLE USA',
            number=123456789)
        # mocker.patch.object(
        #    モックしたいクラスオブジェクト,
        #    'モックしたいメソッド',
        #    return_value=返したい値)
        m = mocker.patch.object(
            CustomerDriver,
            'find_by',
            return_value=customer)

        assert CustomerGateway.get('user_id') == Customer(
            id='id001',
            name='Alice',
            zip_code=77777,
            address='SEATTLE USA',
            number=123456789)

        m.assert_called_once_with('user_id')
<mocker.patch()を使った場合>
class TestCustomerGateway:

    def test_get_customer(self, mocker):
        customer = Customer(
            id='id001',
            name='Alice',
            zip_code=77777,
            address='SEATTLE USA',
            number=123456789)
        # mocker.patch(
        #    'モックしたいクラスオブジェクト.モックしたいメソッド',
        #    return_value=返したい値)
        m = mocker.patch(
            'app.main.driver.customer_driver.CustomerDriver.find_by',
            return_value=customer)

        assert CustomerGateway.get('user_id') == Customer(
            id='id001',
            name='Alice',
            zip_code=77777,
            address='SEATTLE USA',
            number=123456789)

        m.assert_called_once_with('user_id')

終わりに

初めてPythonのテストを書いた私がハマったことをつらつらと書いてみましたが、
この記事が私と同じように初めてPythonのテストを書く方のお役に立てれば幸いです。

参考資料

ペアプロと育休の取得しやすさの関係について

こんにちは。SPEEDA開発チームの鈴木です。

昨年一児(娘)の父になりまして、凄い勢いで変化していく様子に喜んだり困ったりしながら過ごしております。
色々できることが増えると嬉しいのですが、それは同時にいたずらの幅が広がることも意味するんですよね。例えばものを引っ張ることを覚えたのは嬉しいのですが、私の髪の毛をひっぱってむしるのはやめていただきたい。そんな感じです。

f:id:kenji-suzuki:20191014235944p:plain:w250
髪をむしる娘の図。言葉は通じない。

今回はそんなうちの子が産まれたときに取得した育休の話をしたいと思います。
育休の話とはいっても育休をハックする話とか育児アプリとかの話ではなく、「育休を取得しやすい(と私は思う)SPEEDA開発チームの環境」についての話をします。
本編がそこそこ長い(スミマセン)ので前置きはここら辺で切り上げることとします。

男性の育休取得率

さて突然ですが、2018年度の日本における男性の育休取得率がどのくらいかご存知でしょうか?

6.16%です。

これは過去最高の取得率だそうです。
ちなみに同年の女性の育休取得率が82.2%だそうなので、過去最高とはいえども残念ながらまだまだ低い数値といえるのではないでしょうか。

このような状況の育休ですが、SPEEDA開発チームでは昨年私を含めた2人が育休を取得しています。
母数が少ないので一概に「育休取得率が高い」とは言えないですが、間違いなく育休を取得しやすい環境であると思いますし、だからこそ私は育休を取得したので、今回は私たちの環境の紹介を兼ねて次のような話をしようと思います。

1. 男性が育休を取得しなかった理由  
2.「1」の理由の原因を考える
3.「2」の原因を解決するにはどうしたらいいか  
4. SPEEDA開発チームにおいてはどうしているか  

男性が育休を取らなかった理由

では、まず一般的にどういう理由で育休が取られていないのか、2018年度のデータを見てみましょう。

f:id:kenji-suzuki:20191003170319p:plain
育休を取得しなかった理由2018

これを見ると、男性が一体どういう理由で育休を取得しなかったかがわかります。
(出展: 三菱UFJリサーチ&コンサルティング「平成29年度仕事と育児の両立に関する実態把握のための調査研究事業」)

Top3はこのようになっています。

1位: 業務が繁忙で職場の人手が不足していた(38.5%)
2位: 職場が育児休業を取得しづらい雰囲気だった(33.7%)
3位: 自分にしかできない仕事や担当している仕事があった(22.1%)

今回はこのうち2位の「職場が育児休業を取得しづらい雰囲気だった」と3位の「自分にしかできない仕事や担当している仕事があった」にフォーカスして話を展開したいと思います(人手不足の話は採用や業務効率化に絡んだ話だと思いますが、そこについては別軸の問題だと思いますので今回は言及しません)。

育休を取らなかった理由の原因を考えてみる

次に、上記2つの理由について私なりに原因を考えてみたいと思います。

職場が育休を取得しづらい雰囲気になっている原因

なぜ育休を取得しづらい雰囲気になってしまっているのでしょうか。
原因を私なりに考えてみましたが、このようなことが考えられるのではないかと思います。

1.普通の休暇すら取りづらい
普通に1日2日の休暇を取ることが難しい環境の場合、ある程度まとまった期間になるであろう育休は余計取りづらく感じることかと思います。
どういう場合に休暇が取りづらくなるのか、2つ思い当たります。
1つ目は、自分が担当している仕事に期限があり、休むことで間に合わなくなったり、後々辛くなったりと自分が困るケースです。
2つ目は、自分が休むと自分の仕事を周囲の誰かが余分に担当することになり、迷惑がかかるケースです。
1つ目のケースで休んだ分を自分でカバーできずに間に合わなくなった場合は、2つ目のように他の誰かがフォローにまわり結局周囲の負荷が増えることが考えられます。
そういうことを考えていくとやはり休みづらくなるのではないでしょうか。
また、こういった懸念は想像に過ぎず実際は自分が休んだところで大した迷惑はかからないのかもしれませんが「安心して休める仕組み」が整っていなければ心理的に休みが取りづらいということはあるでしょう。

2.周りの人が普通の休暇すら取らない
自分だけではなく、周りの人も「1」と同じように自分が困ったり、周囲への迷惑を気にして休みを取得しないような環境では休みづらくなるかと思います。
お互いをお互いで縛り合っている感じです。同調バイアスによる負の連鎖と言えるかもしれません。
休暇と同様に、子供が産まれたのに誰も育休を取得しないような環境では遠慮してしまう(遠慮しなければならない気持ちになる)のではないでしょうか。

3.男性の育休に理解がない
周りの人たちが男性の育休に対して理解がない場合、育休は取得しづらい雰囲気になるでしょう。

どうすれば育休を取得しやすい環境になるか

私が挙げた原因はどれも環境が原因となっていますが、どうしたら育休を取得しやすい雰囲気を作れるでしょうか。
「1」は色々な解決方法があるでしょうが、上述のとおり休んだ場合に自分を含めて誰も困らないような「安心して休める仕組み」があれば解決しそうです。
「2」の解決には「1」の仕組みの存在に加えて実際その仕組みが働いている必要があります。
「3」は「1」の仕組みで部分的に解決しそうです。なぜなら「3」のような人たちが育休に反対する理由は”育休を取得することによって自分や他の人たちに迷惑がかかる”ということにあったりするからです。
「誰かが迷惑するから育休には反対」とは言えても「誰にも迷惑はかからないが育休には反対」とは言えないのではないでしょうか。

このように考えてみると 「安心して休める仕組み」が存在して運用されていることが、育休を取得しづらい雰囲気を解消する方法の一つになりそうです。

安心して休める仕組みとは

休みを取得しづらい理由として「休むと自分が困る」というものがありました。
なぜ自分が休むと困る状況になっているのでしょうか。それは「仕事が個人に対してアサインされている」からではないでしょうか。
そもそも自分がやらないといけない仕事なので、誰かに代わってもらいづらいというわけです。
このような環境では他の人も同様に「自分の仕事」をもっているでしょうから。
では「仕事がチーム対してアサインされている」ならば問題はないのでしょうか。
いいえ。 チームで仕事をしていても、仕事が属人化しないようにしないと今回の文脈では結局困ることになります。
仕事が属人化していると、計画的に休む場合は引き継ぎが発生するでしょうし、突発的に休むなら引き継げない分を自力で調査したりする手間が発生するからです。
ですので「仕事がチームに対してアサイン」されており、属人化も防がれていなければなりません。

SPEEDAの開発チームではどうしているか

SPEEDAの開発チームでは基本的に「仕事はチームに対してアサイン」されます。
そして「ペアプロ」が属人化を防ぐのに役立っています(やっと本題!)。
(属人化を防ぐためだけにペアプロをしているわけではないことを補足しておきます。ペアプロには属人化の軽減以外にも沢山のメリットがあるんです。ただ今回の話では「属人化」の観点にフォーカスします。)

ペアプロについて具体的に

SPEEDAの開発チームは、すべてのチームがほぼ100%ペアプロで作業しています(ペアプロそのものについての詳しい説明は割愛させていただきます) 。
そしてプログラミング以外(例えば採用活動)でもペア作業をします。
更にペアを組むメンバーを一日のうち何度も入れ替えており、様々なメンバーと様々な開発ストーリーに取り組むことになります。
領域もUI/UX含めたフロント周りから、バックエンド、インフラ、CI/CDなどすべてを皆で担当します。
その結果として、チームの中で「フロント部分はAさんしか知らない」「バックエンド部分はBさんしか知らない」という属人化が起きなくなっています。

チームの人数が多く、かつ小規模なストーリーの場合、自分が担当する前にストーリーが終わってたということはありますが、自分しか知らないという状況にはなりません。
またチームの人数が奇数の場合は一人作業※が発生しますが、その場合は一人でも十分なストーリーを選ぶような工夫をしています。 ※一人で作業するというのは普通かもしれませんが私たちの場合ペアが基本なため一人は特別な感じです。
そのため、誰かが休んで困るということが基本的にありません。
なので用事があれば休みますし、周りがそうなので上述のように「周りが休まないから休みづらい」ということがありません。

ペア作業のイメージ

ここで補足としてペア作業についてイメージしやすいように絵で表してみることにします。
あるチームにメンバーとして、Aさん、Bさん、Cさん、Dさんがいるとします。
f:id:kenji-suzuki:20191006232514p:plain

ペアは、AさんとBさん、CさんとDさんという組み合わせとします。
環境はこのような感じです。

f:id:kenji-suzuki:20191014011451p:plain
ペアプロ環境

一つのデスクトップPCにモニター2つとキーボード2つが接続されています。
マウスが見当たらないのは画像の手抜きではなく、Thinkpadのトラックポイント付きキーボードを使っているためです。
各々に対してキーボードが存在するため、ドライバーとナビゲーターの役割がスムーズに交代できます(キーボードが一つしかなかったら、心理的にキーボードを渡してもらったり渡したりというのがやりづらくなるかと思います)。
モニターはミラーリングされていたり左右で分割されていたり、ペアによってやりやすいように変えています。

CさんDさんペアも同じ構成で作業します。
f:id:kenji-suzuki:20191014011751p:plain

作業の流れ

時間は13時、AさんとBさんペアはストーリーXに着手します。CさんとDさんペアはストーリーYに着手します。
f:id:kenji-suzuki:20191006232845p:plain

時間が14時になりました。ここでペアを入れ替えます。1コマの時間(ペア交代までの時間)は、チームで話し合って決めます。この例では1時間で交代するものとします。今度はDさんAさんペア、BさんCさんがペアになりました。横にスライドする形ですね。
f:id:kenji-suzuki:20191006234446p:plain
入れ替えの際は、作業開始時にいまどういう状況なのかの共有が行われることが多いです。
(省いていますが、本当は適宜休憩を入れます(休憩超大事!))
また、OSやディレクトリ構成などは意図的に統一しているため、別のマシンに使ったとき困りません。

時間が15時になりました。またペアを入れ替えます。CさんDさんペアと、AさんBさんペアです。
f:id:kenji-suzuki:20191006235005p:plain
これで全員がストーリーX、ストーリーY両方に少しずつ関わったことになり、特定のメンバーしかわからないということがなくなりました。
複数のストーリーに少しずつ関わるということは、一つのストーリーに100%関われないことも同時に意味し、自分の知らないコードが存在し得るようになります。
こういった問題に対しては、他のメンバーに伝えた方がよさそうなことについて(例えば設計の部分)は都度々々共有したり相談したりしながら進めることで対応しています。
メンバーが増えていくとこういった共有も大変になってきたり、上記のペアの入れ替え方法だとペアを組めない人たち(例えばAさんとCさんはペアになれない)が出てきたりと、ペア作業については話題が尽きないのですが、今回の趣旨はペア作業によって属人化が軽減されているということであるため割愛させていただきます。

チームの入れ替え

チーム内でペアの入れ替えを行っていることは上述しましたが、チームをまたいだメンバーの入れ替えも行われています。
例えばXチームとYチームという2つのチームがあったとします。 このチームに対し、それぞれが担当している仕事はそのままに、メンバーを一部入れ替えるのです。 f:id:kenji-suzuki:20191007004951p:plain

↑を↓のように入れ替える。

f:id:kenji-suzuki:20191007005034p:plain
入れ替えの頻度は決まっておらず、リソース調整の結果であったり、タイミング的なものであったりします。
また、基本的には本人の意思が最大限に尊重され本人が希望するチームに移動することになります。強い理由があって同じチームに長く残るということもあります。
このような入れ替えが行われることにより、SPEEDAの開発チームにおける属人性は極めて少なくなっているといってもよいかと思います。
※ペアプロと同様に属人化を防ぐためだけに入れ替えをしているわけではなく、知見の共有であったりチームに新たな風を起こしたりと他にも色々メリットがあるから入れ替えが行われています。

最後に

私がSPEEDA開発チームにおいて、育休を取得しづらいとは思わなかった理由は、ユーザベースという会社自体も周りの人たちも男性の育休に理解があったということもありますが、ペアプロとチームの入れ替えによって「属人化」が軽減されており「自分が休むと誰かが困る」という心理的負担がなかったことも大きいと思います。
今回は主に「属人化」という点にフォーカスしてペアプロの話をしましたが、ペアプロがもたらす良い作用は他にも沢山あるので、(いきなりは難しいかもしれませんが)興味がわいた方は是非ペアプロを取り入れてみてはいかがでしょうか。
いきなりガッツリではなく、小規模に始めたり、短時間やってみたりするだけでも「良さ」がわかるかもしれません。

Ringアプリケーションで例外をいい感じにハンドリングする方法(Ductでの解説も含む)

こんにちは!こんにちは!SPEEDA開発チームのあやぴーです。 社内のClojureを使ったAPIにおいて、「例外をうまくハンドリングしたいんだけど…」という話が出てきたので、今回はRingアプリケーションにおける例外のハンドリング方法について解説します。また、昨今Ductを使ってアプリケーションを作る機会が増えているので、それについては最後の方に解説をします。

また具体的なコードはGitHubにあるので、参考にしてみてください。

https://github.com/ayato-p/exception-handling

この記事で解説する具体的な内容については以下の通り

  • 例外のハンドリングにRingミドルウェアを使う
  • 例外によってレスポンスを変更する
  • Ductアプリケーションへの適用方法
  • まとめ

例外のハンドリングにRingミドルウェアを使う

Ringアプリケーションの特にRingハンドラー(以下、ハンドラー)部分で例外が出る場合について考えてみます。簡単のために以下のように、例外を投げる処理do-somethingを実行するハンドラーthrow-exception-handlerがあることにします。

(ns demo.core
  (:require [ring.util.response :as response]))

(defn- do-something []
  (throw (ex-info "err" {:exception/type :server-error})))

(defn throw-exception-handler [req]
  (let [res (do-something)]
    (response/response res)))

本来であればdo-somethingが適当な値を返して、それをハンドラーのレスポンスとして返すところですが、この例ではdo-somethingが例外を起こすため、適切なレスポンスを返すことができません(Ringアダプターの実装にもよりますが、多くの場合は500エラーが返ると思います)。このハンドラーの挙動は以下のテストで確認することができます。

(ns demo.core-test
  (:require [clojure.test :as t]
            [demo.core :as sut]
            [ring.mock.request :as mock]))

(t/deftest throw-exception-handler-test
  (t/testing "throw-exception-handlerが例外を返すこと"
    (let [req (mock/request :get "/err")]
      (t/is
       (thrown-with-msg? clojure.lang.ExceptionInfo #"err"
                         (sut/throw-exception-handler req)))

      (let [data (try
                   (sut/throw-exception-handler req)
                   (catch clojure.lang.ExceptionInfo e
                     (ex-data e)))]
        (t/is (= {:exception/type :server-error}
                 data))))))

このようにハンドラーの中で例外が発生する場合、各ハンドラーやその手前のレイヤーで適切にハンドリングして、例外をハンドラーの外側に伝播させないというのも場合によってはやると思います。しかし、予期しない例外や一般的な例外などに対してすべてのハンドラーで対応するのはあまりにも面倒です。そういった例外をハンドルするための、いい感じの機能が欲しいところです。

Javaの場合、JAX-RSのExceptionMapperあたりが今回欲しい機能に該当しそうです。Ringには残念ながら同じようなものはありません。そのため、少しだけ考えてみることにします。上記のテストコードがヒントになりそうです。上のコードのようにthrow-exception-handlerにリクエストマップを渡すようなコードをtry~catchで囲んで、例外を吐き出したときのみ例外のレスポンスマップを返すように書くことを考えれないでしょうか。

(try
  (throw-exception-handler req)
  (catch Exception e
    (-> (response/response "Internal Server Error!!")
        (response/status 500))))

このようにハンドラーを実行して、その結果次第でレスポンスに影響を与える方法を僕らは既に知っているはずです。Ringミドルウェア(以下、ミドルウェア)です。次のようなミドルウェアwrap-exception-handlerを考えてみます。

;; demo.core
(defn wrap-exception-handler [handler]
  (fn exception-handler [req]
    (try
      (handler req)
      (catch Exception e
        (-> (response/response "Internal Server Error!!")
            (response/status 500))))))

これはハンドラーを受け取って、新しいハンドラーを返します。新しいハンドラーは元のハンドラーに対して、自身が受け取ったリクエストを適用するだけで基本的には何もしませんが、例外が投げられたときにそれをキャッチして500のステータスコードを返すようにしています。

wrap-exception-handlerは以下のテストコードで機能していることを確認することができます。

;; demo.core-test
(t/deftest wrap-exception-handler-test
  (t/testing
      "例外を投げないハンドラーが実行されたら何もせずに元の結果を返すこと"
    (let [req (mock/request :get "/err")
          handler (constantly {:status 200
                               :body "Hello, world"})
          app (sut/wrap-exception-handler handler)]
      (t/is (= {:status 200
                :body "Hello, world"}
               (app req)))))
  (t/testing
      "例外を投げるハンドラーが実行されたらステータスコード500のレスポンスを返すこと"
    (let [req (mock/request :get "/err")
          handler (fn [_] (throw (Exception. "err")))
          app (sut/wrap-exception-handler handler)]
      (t/is (= {:status 500
                :body "Internal Server Error!!"
                :headers {}}
               (app req))))))

例外によってレスポンスを変更する

例外をハンドルすることができたので、今度は例外の種類によって返すレスポンスを変化させてみます。既に想像がついていると思いますが、try~catchで掴む例外を複数用意すれば簡単にレスポンスを変えることができます。やってみましょう。

以下ではIllegalArgumentExceptionのときに500clojure.lang.ExceptionInfoのときはex-data:exception/typeの値によってステータスコードを変えています。また、対応できていない例外に対しては500を返しつつも、標準出力に"Unhandled Exception:"と流すようにしています。

(t/deftest wrap-exception-handler-test
  ;; "例外を投げないハンドラーが実行されたら何もせずに元の結果を返すこと"

  (t/testing
      "例外を投げるハンドラーが実行されたら適切なエラーコードを返すこと"

    (t/testing "IllegalArgumentExceptionのとき500を返す"
      (let [req (mock/request :get "/err")
            handler (fn [_] (throw (IllegalArgumentException. "err")))
            app (sut/wrap-exception-handler handler)]
        (t/is (= {:status 500
                  :body "Internal Server Error!!"
                  :headers {}}
                 (app req)))))

    (t/testing "対応できていない例外は500を返す"
      (let [req (mock/request :get "/err")
            handler (fn [_] (throw (NullPointerException. "err")))
            app (sut/wrap-exception-handler handler)]
        (t/is (= {:status 500
                  :body "Internal Server Error!!"
                  :headers {}}
                 (app req)))
        (t/is (str/starts-with?
               (with-out-str (app req))
               "Unhandled Exception: java.lang.NullPointerException"))))

    (t/testing "ExceptionInfo"
      (t/testing ":exception/typeが:server-errorのとき500を返す"
        (let [req (mock/request :get "/err")
              handler (fn [_] (throw (ex-info "err" {:exception/type :server-error})))
              app (sut/wrap-exception-handler handler)]
          (t/is (= {:status 500
                    :body "Internal Server Error!!"
                    :headers {}}
                   (app req)))))

      (t/testing ":exception/typeが:not-foundのとき400を返す"
        (let [req (mock/request :get "/err")
              handler (fn [_] (throw (ex-info "err" {:exception/type :not-found})))
              app (sut/wrap-exception-handler handler)]
          (t/is (= {:status 404
                    :body "Not Found!!"
                    :headers {}}
                   (app req))))))))

こうなるようにwrap-exception-handlerを実装すると、以下のようになります。

(defn- internal-server-error-response []
  (-> (response/response "Internal Server Error!!")
      (response/status 500)))

(defn- not-found-response []
  (-> (response/response "Not Found!!")
      (response/status 404)))

(defprotocol ExceptionToResponse
  (->response [e]))

(extend-protocol ExceptionToResponse
  Exception
  (->response [e]
    (println "Unhandled Exception:" (type e))
    (clojure.stacktrace/print-stack-trace e)
    (internal-server-error-response))

  IllegalArgumentException
  (->response [e]
    (internal-server-error-response))

  clojure.lang.ExceptionInfo
  (->response [e]
    (let [{t :exception/type} (ex-data e)]
      (case t
        :server-error
        (internal-server-error-response)
        :not-found
        (not-found-response)))))

(defn wrap-exception-handler [handler]
  (fn exception-handler [req]
    (try
      (handler req)
      (catch Exception e
        (->response e)))))

500404のレスポンスだけの関数(internal-server-error-response, not-found-response)を作りました。また実際に例外をふりわけるところはExceptionToResponseというプロトコルに任せて、それぞれの例外ごとに実装をできるようにしています。これによってwrap-exception-handlerでやっていることは非常に明快になりました。

このようにRingミドルウェアを応用することで、例外ごとに任意のレスポンスを返すことができました。

Ductアプリケーションへの適用方法

上記のwrap-exception-handlerを適用するDuctアプリケーションに適用するのは非常に簡単です。まずはプロジェクトを用意するところから。次のコマンドを使ってDuctプロジェクトの雛形を作ります。

$ lein new duct demo --to-dir exception-handling-api -- +api +ataraxy

今回はAtaraxyというデータでルーティングを記述できるライブラリを利用します。これはDuctで既にモジュール化されているため、簡単に使い始めることができます。

次に先程のwrap-exception-handlerを使って、Ductから使えるコンポーネントを用意します。

(ns demo.middleware.exception-handler
  (:require [ring.util.response :as response]
            [integrant.core :as ig]))

;; 中略

(defn wrap-exception-handler [handler]
  (fn exception-handler [req]
    (try
      (handler req)
      (catch Exception e
        (->response e)))))

(defmethod ig/init-key :demo.middleware/exception-handler [_ _]
  wrap-exception-handler)

wrap-exception-handlerの実装については、既に説明したものをそのまま利用します。そして、ミドルウェアのコンポーネント:demo.middleware/exception-handlerでは、単にwrap-exception-handlerを返すようにします。

次に、このミドルウェアを適用するには、config.ednに以下のような記述をします。大事なところは:duct.module/ataraxyのキーに対応するマップです。具体的な記法の説明については、Duct module.ataraxyのREADMEに譲りますが、Ataraxyのシンタックスでいうresult部に対してメタ情報として任意のミドルウェアを指定することで先程のミドルウェアのコンポーネントを適用することができます。

{:duct.profile/base
 {:duct.core/project-ns demo

  ;; Middlewares
  :demo.middleware/exception-handler {}

  ;; Handlers
  :demo.handler.throw-exception/not-found {}
  :demo.handler.throw-exception/server-error {}}

 :duct.profile/dev   #duct/include "dev"
 :duct.profile/local #duct/include "local"
 :duct.profile/prod  {}

 :duct.module/ataraxy
 {"/" ^:exception-handler
  {"not-found" [:throw-exception/not-found]
   "server-error" [:throw-exception/server-error]}}

 :duct.module/logging {}
 :duct.module.web/api
 {}}

まとめ

ClojureでWebアプリケーションを作っても、簡単に例外のハンドリングはできるよ!

【kubernetes / Helm】大量のCronJobに悩む貴方に送るプラクティス

はじめに

こんにちは! UZABASE SPEEDA SRE teamの生賀id:skikkh(@skikkh)です。

最近あった嬉しかったことは、自分が翻訳した日本語がkubernetesのCronJob - Kubernetesページに反映されていたことです。

閑話休題、弊社SPEEDAサービスでは大量のバッチジョブがHinemosを起点としてVM上で動作しています。 SREチームではこのようなジョブ群を徐々にサーバから切り離して、コンテナライズを進めています。

そんな大量にあるジョブですが、環境変数だけが異なっているものも多数あり、実行環境 x 環境変数と環境が異なると掛け算式に増えていきます。 これをkubernetesのCronJobでyamlハードコーディングすると容易に1000行を超えてしまい、管理上のコストも含め現実的ではありません。1

そこで、kubernetesパッケージマネージャーを使用することにしました。その選択肢としてはHelmKustomizeなどがあります。 これらを利用することで環境毎に異なる設定値のリソースが作成できたり、複雑な依存関係を持つkubernetesリソース群をChartという一つのフォーマットにまとめることができます。

今回、要件を満たすエントリが見当たらなかった為、利用事例として投稿させていだきます。

目次

Kustomizeの場合

まずはKustomizeで同様の利用を想定した際の構成を見てみましょう。

下記ディレクトリ構成は公式のKustomizeのGithubの、サンプルディレクトリ構成から拝借しました。

Kustomizeのサンプルディレクトリ構成

~/someApp
├── base
│   ├── deployment.yaml
│   ├── kustomization.yaml
│   └── service.yaml
└── overlays
    ├── development
    │   ├── cpu_count.yaml
    │   ├── kustomization.yaml
    │   └── replica_count.yaml
    └── production
        ├── cpu_count.yaml
        ├── kustomization.yaml
        └── replica_count.yaml

staging, develop, production環境といった環境差分をoverlaysで表現するのはKustomizeの方がいいかもしれませんが、Kustomizeの基本的なユースケースに照らし合わせてジョブを作成するとなると、次の例ようにCronJob毎にディレクトリを切る必要があり、管理コストが嵩んでしまいます。

Kustomizeで多数CronJobを作成する

例えば24個のAジョブと24個のBジョブ計48個作成する場合、overlays下に48個のディレクトリ構成ができてしまいます。以下がサンプルになります。

~/someApp
├── base
│   ├── cronjob.yaml
│   └── kustomization.yaml
└── overlays
    ├── 0101-job-a
    │   ├── category.yaml
    │   ├── kustomization.yaml
    │   └── table.yaml
    ├── 0102-job-a
    │   ├── category.yaml
    │   ├── kustomization.yaml
    │   └── table.yaml
    ├── 0103-job-a
    │   ├── category.yaml
    │   ├── kustomization.yaml
    │   └── table.yaml
    │      ︙ # 増えるだけ用意しないといけなくなる
    └── 0224-job-b
        ├── caterory.yaml
        ├── kustomization.yaml
        └── table.yaml

バッチジョブが20を超えてくるとなるとディレクトリで分割するのは現実的に厳しいと思います。

参考:Kustomize で CronJob を同一テンプレートからスケジュール毎に生成する

以上の理由から、KustomizeではなくHelmを利用することを決めました。

Helmの場合

Helmではアクションと呼ばれる制御構造によってリストをループ処理することができます。したがって、今回のケースではこちらを採用することにしました。

Helm, Tillerのバージョンはv2.14.3を使用しています。

今回の要件として、

  • 同一のテンプレートをベースとして、
  • DBのテーブル毎に含まれる、
  • 複数の地域情報を取り出し処理ができる2

バッチジョブを作成する必要がありました。

これを実現するためには先述したループ処理を行う必要があります。

実現方法としては「ループ処理をネストすれば可能」というのが答えなのですが、少しだけハマりどころがあったので、それも合わせてお話できればと思います。

通常のCronJobをループ処理する場合、values.yamlに回したい変数のリストを作成し、rangeを入れればループ処理が可能です。

単一のリストを利用してCronJobをループ処理する

まずはスケジュール毎にCronJobを作成したい場合を想定してみましょう。 以下の実行例ではスケジュール毎にスケジュールのインデックスをechoで出力するという設定をyamlで行います。動作確認したい場合はhelm testを利用しましょう。

test-schedule-cj.yaml

{{- range $index, $schedule := .Values.global.schedules }}
--- # 複数作成の為に必須
apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: test-{{ $index | add1 }} # CronJob毎に一意になるような名前をつける必要がある
  namespace: {{ $namespace }}
  labels:
    chart: "{{ $chartName }}-{{ $chartVersion }}"
    release: "{{ $releaseName }}"
    heritage: "{{ $releaseService }}"
spec:
  schedule: {{ $schedule }}
  suspend: false
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      template:
        spec:
          serviceAccount: {{ $serviceAccount }}
          containers:
          - image: "{{ $imageRepository }}:{{ $imageTag }}"
            name: test
            # テストで検証するのであれば、busyboxイメージでechoを出力します。  
            args:
            - /bin/sh
            - -c
            - date; echo Index count is $(INDEX).
            env:
              name: INDEX
              value: {{ $index | add1 }}
          restartPolicy: OnFailure
{{- end }}

因みにyamlの可読性の為、最初に以下のような代入を行っています。(以降、省略)

test-schedule-cj.yaml

# global
{{- $namespace := .Values.global.namespace }}
{{- $serviceAccount := .Values.global.serviceAccount }}
{{- $imageRepository := .Values.global.imageRepository }}
{{- $imageTag := .Values.globa.imageTag }}
# chart
{{- $chartName := .Chart.Name }}
{{- $chartVersion := .Chart.Version }}
{{- $releaseName := .Release.Name }}
{{- $releaseService := .Release.Service }}

values.yamlの設定は以下のようになります。

vaules.yaml

global:
  # kubernetes
  namespace: test-ns
  serviceAccount: test-sa

  # image  
  imageRepository: busybox
  imageTag: latest

  schedules: [
    '0 0 * * *',  '10 0 * * *',
    '20 0 * * *', '30 0 * * *'
    ]

特に難しい部分はなく、rangeアクションで作成したいCronJobを囲むだけです。 ただ一つ注意しないといけない点として、CronJobの名前は一意になるようにしなければいけません。 その為、全ての名前が一意になるように命名規則を考えてつけましょう。 基本的にはループで回す変数名をつけるようにすれば大丈夫だと思います。3

作成後、以下のhelmコマンドを実行します。

$ helm upgrade --install job01 .

これでテーブルの数だけジョブを回すことができるChartのリリースができますね。

複数のリストを利用してCronJobをループ処理する

一つの変数の条件でループ処理ができたので、複数の変数のリストを使用してループする場合を考えてみましょう。 スケジュールのループ処理内にテーブルのループ処理をネストするだけで作成できます。 記述例としては以下のようになります。

test-schedule-table-cj.yaml

{{- range $index, $schedule := .Values.global.schedules }}
{{- range $table := .Values.global.tables }}
---
apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: test-{{ $index | add1 }}
  namespace: {{ $namespace }}
  labels:
    chart: "{{ $chartName }}-{{ $chartVersion }}"
    release: "{{ $releaseName }}"
    heritage: "{{ $releaseService }}"
spec:
  schedule: {{ $schedule }}
  suspend: false
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      template:
        spec:
          serviceAccount: {{ $serviceAccount }}
          containers:
          - image: "{{ $imageRepository }}:{{ $imageTag }}"
            name: test
            args:
            - /bin/sh
            - -c
            - date; echo Index count is $(INDEX). Table name is $(TABLE_NAME).
            env:
              name: INDEX
              value: {{ $index | add1 }}
              name: TABLE_NAME
              value: {{ $tablel.name }}
          restartPolicy: OnFailure
{{- end }}
{{- end }}

values.yaml

global:
  # kubernetes
  namespace: test-ns
  serviceAccount: test-sa

  # image  
  imageRepository: busybox
  imageTag: latest

  # schedule info
  schedules: [
    '0 0 * * *',  '10 0 * * *',
    '20 0 * * *', '30 0 * * *'
    ]

  # table info
  tables: 
    - name: XxxTest
      category: walk
    - name: YyyTest
      category: walk
    - name: ZzzTest
      category: run

しかし、例のようにスケジュール×テーブルという変数でジョブを回そうとすると失敗してしまいます。 以下が失敗の際に出力されるエラーになります。

UPGRADE FAILED
Error: render error in "test-loop/templates/est-schedule-table-cj.yaml": template: test-loop/templates/est-schedule-table-cj.yaml:15:28: executing "test-loop/templates/test-schedule-table-cj.yaml" at <.Values.global.tables>: can't evaluate field Values in type interface {}
Error: UPGRADE FAILED: render error in "test-loop/templates/est-schedule-table-cj.yaml": template: test-loop/templates/est-schedule-table-cj.yaml:15:28: executing "test-loop/templates/est-schedule-table-cj.yaml" at <.Values.global.tables>: can't evaluate field Values in type interface {}

失敗の原因は.のカレントスコープが「.Values.global.schedules」を向いているためで、.Values.global.regionが習得できません。

回避策としては、ループの処理の際に、「常にグローバルスコープを持つ$を使用する」ことでこのエラーを回避できます。

したがって、以下のdiffの変更点のように最初の記述でループしたい変数を別の変数に代入することでスコープを変えないままネストしたループ処理ができるようになります。 また、CronJobのリソース名も一意にするため、テーブルの名前を更新しておきましょう。

test-schedule-table-cj.yaml

+ {{- $regions := .Values.global.tables }}

- {{- range $index, $schedule := .Values.global.schedules }}
- {{- range $table := .Values.global.tables }}
+ {{- range $index, $schedule := .Values.global.schedules }}
+ {{- range $table := $tables }} 

-   name: test-{{ $index | add1 }} 
+   name: test-{{ $index | add1 }}-{{ $table.name }} # 名前を一意にするため

しかし、これでもまだ十分ではありません。あくまでスケジュールのindexがほしいのではなく、スケジュール(cron)毎に習得される地域の変数をCronJobのmetadataや環境変数に代入したいのです。

スケジュールの時間毎に、異なる地域のバッチジョブを実行する

golangのSprig libraryを利用してリストを取得するようにしています。 このような形にしたのは、複数のバッチジョブを後述するsubchartに記述する際、values.yamlをDRYにするためです。 全てのリストを1つづつ取得する関数がなかったので、次のような形で再現しています。

test-region-table-cj.yaml

{{- $regions := .Values.global.tables }}

{{- range $index, $schedule := $schedules }}
{{- range $table := $tables }}

# スケジュール毎に地域のリストを取得する
{{- $region := slice $regions $index ( $index | add1 ) | first }}

---
apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: test-{{ $region }}-{{ $table.name }} # indexではなく地域別で名前をつけている
  namespace: {{ $namespace }}
  labels:
    chart: "{{ $chartName }}-{{ $chartVersion }}"
    release: "{{ $releaseName }}"
    heritage: "{{ $releaseService }}"
spec:
  schedule: {{ $schedule }}
  suspend: false
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      template:
        spec:
          serviceAccount: {{ $serviceAccount }}
          containers:
          - image: "{{ $imageRepository }}:{{ $imageTag }}"
            name: {{ $name }}
            args:
            - /bin/sh
            - -c
            - date; echo This chart has region $(REGION) and table $(TABLE_NAME).
            env:
              - name: TABLE_NAME
                value: "{{ $table.name }}"
              - name: TABLE_CATEGORY
                value: "{{ $table.category }}"
              - name: REGION
                value: {{ $region | quote }}
          restartPolicy: OnFailure
{{- end }}
{{- end }}

values.yamlにはglobal.regionsを、scheduleのリスト長と同一のリストを加えましょう。4

values.yaml

+  regions: [japan, us, uk, china]

これで、地域別のDBのテーブルの情報が取得できるようになりましたね。

サブチャートを作成して、一連のワークフローを一つのChartで再現する

実際には上記のようなバッチジョブが後続に存在しているので、これらをひとまとまりにして扱う必要があります。 最後に、これらバッチジョブをひまとまりにして処理する方法を学びましょう。

複数のjob(Dockerイメージが別)を扱うためにsubchartを採用しました。 ServiceAccount関連のリソースやNetworkPolicyやNamespaceのようなグローバルなリソースは大本のChartにまとめ、各ジョブをsubchartに入れるという形を取ります。

values.yamlの項目で、globalの変数と、そうでないsubchart毎のローカル変数を使いわけることで実現できます。

以下にサンプルのディレクトリ構成を挙げます。

.
├── Chart.yaml
├── charts
│   ├── 0101-job
│   │   ├── Chart.yaml
│   │   └── templates
│   │       ├── _helpers.tpl
│   │       ├── xxx-cronjob.yaml
│   │       └── tests
│   │           └── test-xxx.yaml
│   ├── 0102-job
│   │   ├── Chart.yaml
│   │   └── templates
│   │       ├── _helpers.tpl
│   │       ├── yyy-cronjob.yaml
│   │       └── tests
│   │           └── test-yyy.yaml
│   └── 0103-job
│       ├── Chart.yaml
│       └── templates
│           ├── _helpers.tpl
│           └── zzz-cronjob.yaml
│           └── tests
│               └── test-zzz.yaml
├── templates
│   ├── NOTES.txt
│   ├── _helpers.tpl
│   ├── namespace.yaml
│   ├── networkpolicy.yaml
│   ├── secrets
│   │             ︙
│   └── serviceaccount.yaml
└── values.yaml

個別のyaml設定の注意点

各サブチャート毎に分化されたschedulesはグローバルの設定ではなく、個別のジョブ毎のローカルの設定になるため、各subchart内では、.Values.global.schedulesから.Values.schedulesのようにグローバルのスコープをローカルのスコープに変更しておきましょう。

+ {{- $category := .Values.schedules }}
- {{- $tables := .Values.global.schedules }}

フラグで実行するジョブバッチを選択する

後でsubhart毎にテストをするには、個別のsubchartだけを実行する必要があります。 そのため、enabledを各subchartに記述します。

以下のように各CronJobリソースを条件式{{- if .Values.enabled }}で挟んで、

test-subchart-cj.yaml

{{- if .Values.enabled }}
{{- range $index, $schedule := $schedules }}
{{- range $table := $tables }}
---
apiVersion: batch/v1beta1
︙
{{- end}}
{{- end}}
{{- end}}

values.yamlに以下の設定を入れます。

values.yaml

0101-job:
  enabled: true # 全てtrueにしておく
  imageRepository: xxx
︙

実行時に不必要なジョブに対してfalseのフラグを立てることで個別実行が実現できます。

$ helm upgrade --install jobs --set 0101-job.enabled=true --set 0102-job.enabled=false --set 0103-job.enabled=false .

上記は0101-jobだけがChartだけがデプロイされる例になります。

テストに関して1つ注意点があります。 テストを行う毎はテストの実行ジョブの順番が変わるのでご注意下さい。5

完成したvalues.yaml

最終的なvalues.yamlは次のようになります。

values.yaml

global:
  # kubernetes
  namespace: test-ns
  serviceAccount: test-sa

  # table info
  tables: 
    - name: XxxTest
      category: walk
    - name: YyyTest
      category: walk
    - name: ZzzTest
      category: run
     

  regions: [japan, us, uk, china]

  # test config
  tests:

    tables: 
      - name: XxxTest
        category: walk

    regions: [japan, china]

0101-job:
  enabled: true
  imageRepository: xxx
  imageTag: 1.0.0
  schedules: [
    '0 0 * * *',  '10 0 * * *',
    '20 0 * * *', '30 0 * * *'
    ]

0102-job:
  enabled: true
  imageRepository: yyy
  imageTag: 1.0.0
  schedule: '0 1 * * *'

0103-job:
  enabled: true
  imageRepository: zzz
  imageTag: 0.1.0

  schedules: [
    '0 2 * * *', '10 2 * * *',
    '20 2 * * *', '30 2 * * *'
    ]

おわりに

今回ループ処理化したCronJob Aが成功したらCronJob Bを実行すると言ったワークフローは単純なスケジュール(cron)でHelm Chart化しました。

その他ハマりどころとしてはvalues.yamlのsubchart名と一致しないChart名、ディレクトリ名になっているとチャートがデプロイされないという事例がありました。案外、見落とします。 名前を変更した際には確認するようにしましょう。

最後に注意点で、今回のエントリでは実運用で想定するようなTLSの暗号化通信やSecurityContext、Compute Resourcesなど省略しているので、それぞれの環境に合わせて設定していただればと思います。

以上!

仲間募集

ユーザベースのSPEEDA SREチームは、No Challenge, No SRE, No SPEEDA を掲げて業務に取り組んでいます。
「挑戦しなければ、SREではないし、SREがなければ、SPEEDAもない」という意識で、日々ユーザベースのMissionである、「経済情報で、世界をかえる」の実現に向けて邁進しています。

少しでも興味を持ってくださった方はこちらまで


  1. いわゆるwall of YAML

  2. 割り当てたい変数(今回でいうと地域)毎にスケジュール(cron)を生成します

  3. 63文字の制限があります

  4. 本来は順序が逆ですが、説明の便宜上このように書いています

  5. 恐らく前回の実行のリストから+1されています

XP祭りの裏側 〜大規模イベントの運営ってどんなことやるの?〜

こんにちは!SPEEDA開発チームの斎藤です。

先月9月21(土)に開催された開発者向けイベント「XP祭り」は皆様ご存知でしょうか。
今回は運営メンバーとして「XP祭り」に関わり、思いきりお祭りを堪能した私の視点から

  • 運営ってどんなことしてたの?(大変なの?)
  • 運営側だとどんな楽しいことがあるの?

といったことについて少しだけ語らせていただき、
イベントを創る側の魅力や楽しさが伝えられればと思います。

XP祭りとは?

毎年9月ごろに開催されるソフトウェア開発者向けの技術イベントです。
XP(エクストリームプログラミング)やアジャイル、スクラムなどのテーマを中心に、
講演者が培った知見、体験を元にした魅力的な講演、ワークショップが多数行われます。
参加費用は無料!今年は18回目で、来場者数220人と大盛況でした。

f:id:saito86:20190927210953j:plain

XP祭りの運営ってどんなことやるの?

運営側がやることを大きく分けると、
イベント当日までの準備と、当日の運営の2つに大別されます。

①イベント当日までの準備(毎月一回開かれる運営会議で進めていく)

  • イベントの開催日時、会場を決める
  • イベント開催の告知とwebサイトを用意する
  • 講演者を公募し、イベント当日のタイムテーブルを作る
  • 参加者を募集する
  • 各出版社様にお願いし、技術本を寄贈していただく(※) etc...

XP祭りでは毎年、各出版社様から技術本を多数寄贈していただいております。
これらは全て参加者および講演者の方に無料で配布させていただいております。 f:id:saito86:20190927210708j:plain

②イベント当日の運営

  • 会場のセッティング
  • 受付
  • 各講演の司会
  • 後片付け
  • 懇親会 etc...

色々やることがあって大変に見えるかもしれませんね。でも大丈夫です。
XP祭りは今年で18回目なので、既にどんなことをどうやるかの大筋が決まっています。

例えば、イベント会場は早稲田大学教授の鷲崎先生のご厚意で無償でお借りしています。
また、講演者の公募や当日のタイムテーブル作成も、今まで積み重ねてきたものをベースにみんなでワイワイ進めるので、 運営をやるのが初めての私でも特に大変だと感じることはありませんでした。

「運営になるべく負担がかからないようにする」
「運営がお祭りを楽しめるようにする」

ということを意識されているそうで、長くイベントを続ける秘訣を教えていただいたなあと思いました。

XP祭りの運営ってどんな楽しいことがあるの?

これは色々ありますが、大きく分けると2つあると感じました。

①お祭りの準備、開催の楽しさと達成感
学校の文化祭のようなイメージといって伝わるのか自信がありませんが、 お祭りの準備や運営って、参加してくれる方が笑って満足してくれると嬉しいですよね。 自分たちが携わったイベントに200人以上も参加者が集まってくれて、 「XP祭り楽しかった」という感想を聞いたりすると「やって良かった」と思いますね。
もちろん、当日は参加者でもあるため、自分の気になる講演やワークショップに参加しちゃってOKです。

②知り合いが増えて、コミュニティの輪が広がる
運営メンバーを中心に、講演者の方や参加者の方まで、 たくさんの方々とつながりができました。 やっぱり同じようなことに挑戦している仲間や、偉大な先人の方と話ができるのはとても心強いです。
困ったときに相談したり、良い知見が得られたときに共有したりと、コミュニティの強さって本当に素晴らしいなと思います。

f:id:saito86:20190927212902j:plain
お祭りの後はやっぱり懇親会!

終わりに

ほんの少しXP祭りの裏側を書かせていただきましたが、いかがでしたでしょうか。

  • 大規模イベントの運営でも実はそんなに大変じゃない
  • 運営として裏側からもお祭りを楽しめた

という2点が伝わって、少しでも興味を持っていただけたのなら幸いです。
(後はほんの少し勇気を出して飛び込むだけです。来年のXP祭り運営委員の会でお会いしましょう!)

謝辞

XP祭りの運営の皆様、ならびに講演、参加者の皆様、ありがとうございました。
皆様のおかげで、XP祭りは今年も最高に楽しいイベントになったのではないか思います。 また来年もどうぞよろしくお願いいたします!

【Clojure】Ductで始めるWebAPI開発

こんにちは!!SPEEDA開発チームの岡村です!!

私たちの開発チームでは、先日チームメンバの野口が書いたこちらの記事に書かれているように、チームメンバーの入れ替えが頻繁に行われます。

かく言う私も一ヶ月前に行われたチームシャッフルで、ClojureでDuctを使って開発を行うチームに移動しました。(Clojureほぼ未経験)

私がClojure初心者だからこそ、これからClojureとDuctを始めて見ようかなと思っている方に対し、お伝えできることがあると思い、今回はDuctでWebAPIを作成する方法をご紹介しようと思います。

Ductとは

Ductはライフサイクル管理を行うライブラリ、Integrantをベースとしたサーバサイドのフレームワークであり、アプリケーション構造は、Integrantのコンフィグレーションマップによって定義されます。

DuctとIntegrantの関係に関しては、弊社の中村がこちらの記事でも一度ご紹介しています。

作成物

CRUDの題材として定番な、TODO管理を下記リクエストで行えるAPIを作成していきます。

リクエストメソッド パス 内容
GET /todos 一覧を取得する
GET /todos/:id 指定したデータを取得する
POST /todos データを登録する
PUT /todos/:id 指定したデータを更新する
DELETE /todos/:id 指定したデータを削除する

プロジェクト作成

DuctプロジェクトはLeiningenを使って作成することができ、プロファイルを指定すると、
それに合わせプロジェクト構成や設定ファイルを自動で生成してくれます。

$ lein new duct todo +api +ataraxy +postgres

今回はAPIを作成し、ルーティングライブラリにAtaraxy、TodoデータをPostgresで管理するため、
+api +ataraxy +postgresと指定しました。

Todoを管理するデータベースも作成しておきましょう。
今回はDockerコンテナで作成しました。

docker run --name todo-db -p 5432:5432 -e POSTGRES_DB=todo-db -e POSTGRES_USER=username01 -e POSTGRES_PASSWORD=password01 -d postgres:11.5 

システムの起動

DuctはREPLを利用して開発を進めていくため、先ほど作成したプロジェクトに移動し、REPLを起動しましょう!

$ lein repl
nREPL server started on port 50077 on host 127.0.0.1 - nrepl://127.0.0.1:50077
REPL-y 0.4.3, nREPL 0.6.0
Clojure 1.10.0
OpenJDK 64-Bit Server VM 11.0.2+9
    Docs: (doc function-name-here)
          (find-doc "part-of-name-here")
  Source: (source function-name-here)
 Javadoc: (javadoc java-object-or-class-here)
    Exit: Control+D or (exit) or (quit)
 Results: Stored in vars *1, *2, *3, an exception in *e

REPLが起動したら、(dev)と入力し開発環境をロードします。

user=> (dev)
:loaded

その後、(go)とコマンドを入力するとシステムを起動することができるのですが、
DBへの接続情報をまだ記述していないため、下記の様なエラーが発生し、起動することができません。

dev=> (go)
:duct.server.http.jetty/stopping-server
Execution error (PSQLException) at org.postgresql.core.v3.ConnectionFactoryImpl/doAuthentication (ConnectionFactoryImpl.java:534).
サーバはパスワード・ベースの認証を要求しましたが、パスワードが渡されませんでした。
dev=> 

システムを起動させるため、DB接続情報を記述しましょう!
/todo/dev/resources配下にある、dev.ednファイルを以下の様に変更します。
このファイルはファイル名の通り、開発環境での設定を記述するファイルです。

{:duct.database/sql
 {:connection-uri "jdbc:postgresql://localhost:5432/todo-db?user=username01&password=password01"}
 }

Ductではednファイルにコンフィグレーションマップを書いていくことで、
アプリケーションの構造を表現することができます。

ここではduct.database/sqlというコンポーネントの初期化時に、データベースの接続情報が渡される用に定義しています。

書き換えた後に、再度システムを起動させてみましょう。

dev=> (go)
:duct.server.http.jetty/starting-server {:port 3000}
:initiated

上記の様に表示されていれば、システムが起動されています。 curlでリクエストを送ってみましょう。

$ curl http://localhost:3000
{"error":"not-found"}

errorが帰ってきますが、起動していることがわかります。

マイグレーション

ductのmoduleでマイグレーションツールとして提供されている、
migrator.ragtime を利用して、システムの起動時にテーブルの作成と、テストデータの投入が行われるようにしましょう。

先ほど編集したdev.ednを下記のように編集します。

{:duct.database/sql
 {:connection-uri "jdbc:postgresql://localhost:5432/todo-db?user=username01&password=password01"}

 :duct.migrator/ragtime
 {:migrations [#ig/ref :todo.migration/create-todos]}

 [:duct.migrator.ragtime/sql :todo.migration/create-todos]
 {:up ["CREATE TABLE todos (id SERIAL  PRIMARY KEY, title TEXT)"
       "INSERT INTO todos (title) values('test1')"
       "INSERT INTO todos (title) values('test2')"]
  :down ["DROP TABLE todos"]}
 }

システムを再起動させてみましょう。
REPLで(reset)と入力すると、再起動することができます。

dev=> (reset)
:reloading ()
:duct.migrator.ragtime/applying :todo.migration/create-todos#2cd2c3f9
:resumed

これでデータベースにテーブルが作成され、データが投入されました。

一覧取得処理の実装

まずはTodo一覧を取得する処理を実装していきましょう。
/todo/resources/todo/に存在する、config.ednを下記のように編集します。

{:duct.profile/base
 {:duct.core/project-ns todo

  :duct.router/ataraxy
  {:routes
   {[:get "/todos"] [:todo.handler.todos/list]}
   }

  ;; Handlers
  :todo.handler.todos/list {}

  }

 :duct.profile/dev   #duct/include "dev"
 :duct.profile/local #duct/include "local"
 :duct.profile/prod  {}

 :duct.module/logging {}
 :duct.module.web/api
 {}
 :duct.module/sql
 {}}

:duct.router/ataraxyコンポーネントに、 {[:get "/todos"] [:todo.handler.todos/list]}という記述を追加しました。
これは、/todosにGETリクエストが来た時、:todo.handler.todos/listコンポーネントで処理をするということを表しています。

:todo.handler.todos/list {}という記述も追加しました。これは:todo.handler.todos/listコンポーネントの初期化時に空のマップを渡すということを示しています。

ハンドラーの実装

続いてハンドラーを作成していきます。

/todo/src/todo/handlerに、todos.cljというファイルを作成し、以下を記述します。

(ns todo.handler.todos
  (:require [ataraxy.response :as response]
            [integrant.core :as ig]))

(defmethod ig/init-key ::list [_ _]
  (fn [_]
    [::response/ok {:message "OK!!!"}]))

Integrantでマルチメソッドとして定義されている、ig/init-keyを実装することにより、コンポーネントを作成することができます。

::listというのは、todo.handler.todos/listと同じ意味になります。
これにより、先ほどconfig.ednで書いた設定と処理を紐づけることができます。

戻り値は、Ataraxyを利用しているのでベクタで記述することができます。
一先ずリクエストが来たら :message "OK!!!"というマップを返すようにしました。

一旦システムを再起動し、実際にリクエストをしてみましょう!

dev> (reset)
:reloading (todo.handler.todos)
;;=> :resumed
$ curl  http://localhost:3000/todos
{"message":"OK!!!"}

レスポンスが帰ってくることを確認できました!

それでは、実際にデータベースからTodoリストを取得し、返すように実装を変えていきます。

/todo/resources/todo/config.ednを下記のように変更します。

{:duct.profile/base
 {:duct.core/project-ns todo

  :duct.router/ataraxy
  {:routes
   {[:get "/todos"] [:todo.handler.todos/list]}
   }

  ;;Handlers
  :todo.handler.todos/list {:db #ig/ref :duct.database/sql}

  }

 :duct.profile/dev   #duct/include "dev"
 :duct.profile/local #duct/include "local"
 :duct.profile/prod  {}

 :duct.module/logging {}
 :duct.module.web/api
 {}
 :duct.module/sql
 {}}

:todo.handler.todos/list {}と記述していた箇所を、
:todo.handler.todos/list {:db #ig/ref :duct.database/sql}に変更しました。

#ig/ref関数を利用し、:todo.handler.todos/listコンポーネントは、:duct.database/sqlコンポーネントに依存しているということを示しています。

:duct.database/sqlは、最初にdev.ednで記述したコンポーネントのことです。

これにより、:todo.handler.todos/listコンポーネントの初期化時に、
初期化済の:duct.database/sqlコンポーネントを受け取ります。

次に先ほど作成した、todos.cljを以下のように変更します。

(ns todo.handler.todos
  (:require [ataraxy.response :as response]
            [integrant.core :as ig]
            [todo.boundary.todos :as todos]))

(defmethod ig/init-key ::list [_ {:keys [db]}]
  (fn [_]
    (let [todos (todos/get-todos db)]
      [::response/ok todos])))

第二引数に、先ほどconfig.ednで定義したデータベースの接続情報を受け取り、
後で作成する、データベース接続を行うBoundaryにDB接続情報を渡して結果を返すようにしました。

init-keyは、第一引数にコンポーネントのキー(:todo.handler.todos/list)が、

第二引数にはコンフィグマップのコンポーネントに対応する以下のようなバリューが渡ってきます。

{:db #duct.database.sql.Boundary
 {:spec
  {:datasource #object[net.ttddyy.dsproxy.support.ProxyDataSource 0x11e3a8d net.ttddyy.dsproxy.support.ProxyDataSource@11e3a8d]}}}

バリューは:todo.handler.todos/list {:db #ig/ref :duct.database/sql}{:db #ig/ref :duct.database/sql}のことです。
#ig/refと記述しているため、初期化された:duct.database/sqlコンポーネントが渡ってきています。

データアクセス層の実装

続いて、実際にデータベースへの接続を行う層として、Boundaryを作成していきましょう!

/todo/src/todo/boundaryにtodos.cljというファイルを作成し、以下を記述します。

(ns todo.boundary.todos
  (:require
   [clojure.java.jdbc :as jdbc]))

(defprotocol Todos
  (get-todos [db]))

(extend-protocol Todos
  duct.database.sql.Boundary

  (get-todos [{:keys [spec]}]
    (jdbc/query spec ["SELECT * FROM todos"]))
  )

DBへの接続情報を受け取り、clojure.java.jdbcを利用してデータベースにアクセスしています。

システムを再起動し、リクエストを送ってみましょう!

curl http://localhost:3000/todos
[{"id":1,"title":"test1"},{"id":2,"title":"test2"}]

データベースから値を取得することができました!

登録処理の実装

次にデータベースへの登録処理を実装していきましょう。
流れは一覧取得処理を実装した時とほとんど同じです。

config.ednに/todosにPOSTでリクエストが送られてきた時の定義を記述します。

{:duct.profile/base
 {:duct.core/project-ns todo

  :duct.router/ataraxy
  {:routes
   {[:get "/todos"] [:todo.handler.todos/list]
    [:post "/todos" {body :body-params}] [:todo.handler.todos/create body]}
   }

  :todo.handler.todos/list {:db #ig/ref :duct.database/sql}
  :todo.handler.todos/create {:db #ig/ref :duct.database/sql}

  }

 :duct.profile/dev   #duct/include "dev"
 :duct.profile/local #duct/include "local"
 :duct.profile/prod  {}

 :duct.module/logging {}
 :duct.module.web/api
 {}
 :duct.module/sql
 {}}

先ほどと違うところは、HTTPリクエストからパラメータを取得しているというところです。
HTTPリクエストの情報はマップで渡ってきており、そこから登録に必要な:body-paramsの値を取り出しています。

ハンドラーの実装

次にhandlerを下記のように変更します。

(ns todo.handler.todos
  (:require [ataraxy.response :as response]
            [integrant.core :as ig]
            [todo.boundary.todos :as todos]))

(defmethod ig/init-key ::list [_ {:keys [db]}]
  (fn [_]
    (let [todos (todos/get-todos db)]
      [::response/ok todos])))

(defmethod ig/init-key ::create [_ {:keys [db]}]
  (fn [{[_ params] :ataraxy/result}]
    (let [result (todos/create-todo db params)
          id (:id (first result))]
      [::response/created (str "/todos/" id)])))

無名関数の引数が一覧検索の時と異なっています。
この無名関数の引数には下記のようなHTTPリクエスト情報のマップが渡ってきます。

{:ssl-client-cert nil, 
 :protocol HTTP/1.1, 
 :remote-addr 0:0:0:0:0:0:0:1, 
 :params {}, 
 :body-params {:title test3}, 
 :route-params {}, 
 :headers {user-agent curl/7.54.0, host localhost:3000, accept */*, content-length 18, content-type application/json}, 
 :server-port 3000, 
 :muuntaja/request #FormatAndCharset{:format application/json, :charset utf-8, :raw-format application/json}, 
 :ataraxy/result [:todo.handler.todos/create {:title test3}], 
 :content-length 18, :form-params {}, 
 :query-params {}, 
 :content-type application/json, 
 :character-encoding UTF-8, 
 :uri /todoss 
 :server-name localhost, 
 :query-string nil, 
 :muuntaja/response #FormatAndCharset{:format application/json, :charset utf-8, :raw-format */*}, :body #object[org.eclipse.jetty.server.HttpInputOverHTTP 0x74981d49 HttpInputOverHTTP@74981d49], 
:scheme :http, 
:request-method :post}

そのため:ataraxy/resultを分配束縛させています。

データアクセス層の実装

続いてBoundaryを編集しましょう!

(ns todo.boundary.todos
  (:require
   [clojure.java.jdbc :as jdbc]))

(defprotocol Todos
  (get-todos [db])
  (create-todo [db params]))

(extend-protocol Todos
  duct.database.sql.Boundary

  (get-todos [{:keys [spec]}]
    (jdbc/query spec ["SELECT * FROM todos"]))

  (create-todo [{:keys [spec]} params]
    (jdbc/insert! spec :todos {:title (:title params)}))

  )

jdbc/queryを使っても、もちろん実装できるのですが、jdbc/insert!を利用してみました。

これで登録処理の実装は完了です!

システムを再起動してリクエストを送ってみましょう!

curl -i -X POST http://localhost:3000/todos -d '{"title": "test3"}'  --header "Content-Type: application/json" 
HTTP/1.1 201 Created
Date: Thu, 29 Aug 2019 05:14:01 GMT
Location: http://localhost:3000/todos/3
Content-Type: application/octet-stream
Content-Length: 0
Server: Jetty(9.2.21.v20170120)

登録処理を行うことができました!

更新、削除、一件取得処理の実装

更新、削除、一件取得処理もこれまでと同様にして実装することができます!
そのため説明は割愛させていただきますが、最終的に実装は下記のようになります。

todo/resources/todo/config.edn

{:duct.profile/base
 {:duct.core/project-ns todo

  :duct.router/ataraxy
  {:routes
   {[:get "/todos"] [:todo.handler.todos/list]
    [:get "/todos/" id] [:todo.handler.todos/fetch ^int id]
    [:post "/todos" {body :body-params}] [:todo.handler.todos/create body]
    [:put "/todos/" id {body :body-params}] [:todo.handler.todos/update ^int id body]
    [:delete "/todos/" id] [:todo.handler.todos/delete ^int id]
    }}

  ;; Handler
  :todo.handler.todos/list
  {:db #ig/ref :duct.database/sql}
  :todo.handler.todos/fetch
  {:db #ig/ref :duct.database/sql}
  :todo.handler.todos/create
  {:db #ig/ref :duct.database/sql}
  :todo.handler.todos/update
  {:db #ig/ref :duct.database/sql}
  :todo.handler.todos/delete
  {:db #ig/ref :duct.database/sql}
  }

 :duct.profile/dev   #duct/include "dev"
 :duct.profile/local #duct/include "local"
 :duct.profile/prod  {}

 :duct.module/logging {}
 :duct.module.web/api
 {}
 :duct.module/sql
 {}}

todo/src/todo/handler/todos.clj

(ns todo.handler.todos
  (:require [ataraxy.response :as response]
            [integrant.core :as ig]
            [todo.boundary.todos :as todos]))

(defmethod ig/init-key ::list [_ {:keys [db]}]
  (fn [_]
    (let [todos (todos/get-todos db)]
      [::response/ok todos])))

(defmethod ig/init-key ::fetch [_ {:keys [db]}]
  (fn [{[_ id] :ataraxy/result}]
    (let [todo (todos/fetch-todo db id)]
      [::response/ok todo])))

(defmethod ig/init-key ::create [_ {:keys [db]}]
  (fn [{[_ params] :ataraxy/result}]
    (let [result (todos/create-todo db params)
          id (:id (first result))]
      [::response/created (str "/todos/" id)])))

(defmethod ig/init-key ::update [_ {:keys [db]}]
  (fn [{[_ id params] :ataraxy/result}]
    (todos/update-todo db id params)
    [::response/no-content]))

(defmethod ig/init-key ::delete [_ {:keys [db]}]
  (fn [{[_ id] :ataraxy/result}]
    (todos/delete-todo db id)
    [::response/no-content]))

todo/src/todo/boundary/todos.clj

(ns todo.boundary.todos
  (:require
   [clojure.java.jdbc :as jdbc]))

(defprotocol Todos
  (get-todos [db])
  (fetch-todo [db id])
  (create-todo [db params])
  (update-todo [db id params])
  (delete-todo [db id]))

(extend-protocol Todos
  duct.database.sql.Boundary

  (get-todos [{:keys [spec]}]
    (jdbc/query spec ["SELECT * FROM todos"]))

  (fetch-todo [{:keys [spec]} id]
    (jdbc/query spec [(format "SELECT * FROM todos WHERE id = '%s'" id)]))

  (create-todo [{:keys [spec]} params]
    (jdbc/insert! spec :todos {:title (:title params)}))

  (update-todo [{:keys [spec]} id params]
    (jdbc/update! spec :todos {:title (:title params)} ["id = ?" id]))

  (delete-todo [{:keys [spec]} id]
    (jdbc/delete! spec :todos ["id = ?" id]))

  )

まとめ

Integrantの「アプリケーションの構造と実装を、明確に分けて開発を行っていける」という機能を利用して作成されたDuctは、ednファイルで各コンポーネント間の依存関係を明示でき、「何が何に依存しているのかednファイルを見ればわかる」といったとこや、提供されているモジュールを利用することにより、実装を手軽に行えるということに魅力を感じました。

Rustで非同期リクエストでハマったこと

こんにちは!SPEEDAプロダクト開発チームの成です。

去年の秋ごろからRustを触り始め、徐々にRustの魅力に惚れられました。 最初は日々コンパイラーにボコボコにされていましたが、 The Book *1 を読みながら、Rustを少しずつ理解していくと、段々コンパイラーと仲良くなってきて、Rustを書くのも楽しくなりました。

小さいな作業効率化のツールから、Rest Api Server、色んな処理を並列化するBatchなどをRustで作ってきました。最近プロダクトのマイクロサービスの極一部もRustで作っており、幸せ感満喫です!!

本日は、HTTPリクエストを並列化するときハマったことをお話したいと思います。 今日の内容はある程度Rustの知識が必要ですが、初めての方は上の The Book の日本語版 から参考できます。

Rustは独特なメモリ管理仕組みを持ち、安全かつRuntimeで発生したら世界が止まるFullGC的なものも一切ないし、高速な並列処理などが特徴です。めちゃくちゃカッコイイ言語と思いますので、本当に広がってほしいです。なので、初めての方々も是非書いてみてください!

はい、宣伝は以上です。本題に入ります。

今日はサンプルコードを介してハマった点を説明させていただきます。

  • 環境

OS rustバージョン
macOS 10.14.6 rustc 1.39.0-nightly
  • 使ったライブラリ

[dependencies]
reqwest = "0.9.20"
tokio-threadpool = "0.1.15"
futures01 = "0.1.28"
futures-preview = { version = "0.3.0-alpha.18", features = ["io-compat"] }
failure = "0.1.5"

今回は tokio-threadpoolfuturesの01系 を使って並列処理を実現します。
HTTPリクエストのライブラリは reqwest を使います。
記事の最後にはRust公式の新しい非同期シンタックス async / await を試したことを共有したいので、 futuresの03系 も入れました。

  • 同期リクエストの実現

1000回リクエストを順番に実行されるサンプルコードです。

use failure::Fail;
use reqwest::{Client, Url};
use std::thread;
use std::thread::ThreadId;
use std::time::{Duration, SystemTime};

pub type ResultWithError<T> = std::result::Result<T, ErrorWrapper>;

#[derive(Fail, Debug)]
pub enum ErrorWrapper {
    #[fail(display = "http request error: {:?}", error)]
    HttpRequestError { error: reqwest::Error },
}

impl From<reqwest::Error> for ErrorWrapper {
    fn from(error: reqwest::Error) -> Self {
        ErrorWrapper::HttpRequestError { error }
    }
}

fn main() {
    request_with_main_thread();
}

fn request_with_main_thread() {
    output1("START SINGLE THREAD");
    let start_at = SystemTime::now();

    let client = Client::new();

    (0..1000)
        .collect::<Vec<i32>>()
        .iter()
        .for_each(|_| match send_request(&client) {
            Ok((thread_id, text)) => output2(thread_id, text.as_str()),
            Err(error) => output1(format!("{:?}", error).as_str()),
        });

    let spent_time = start_at.elapsed().unwrap_or(Duration::new(0, 0));
    output1(format!("END: {}", spent_time.as_millis()).as_str())
}

fn send_request(client: &Client) -> ResultWithError<(ThreadId, String)> {
    let mut response = client
        .get(Url::parse("http://localhost:9000/timestamp").unwrap())
        .send()?;
    Ok((thread::current().id(), response.text()?))
}

fn output2(thread_id: ThreadId, text: &str) {
    println!("[{:?}] => {}", thread_id, text);
}

fn output1(text: &str) {
    output2(thread::current().id(), text);
}
  • main() 関数以前のコードはエラーハンドリングに関するものなので、説明は割愛します。

  • send_request() 関数ではローカルで別で立ち上がっているRestApiServerにリクエストを投げてシステム時間を返してくれます。

  • 実行結果
[ThreadId(1)] => START SINGLE THREAD
[ThreadId(1)] => 2019-09-01T18:50:22.706987Z
[ThreadId(1)] => 2019-09-01T18:50:22.710136Z
[ThreadId(1)] => 2019-09-01T18:50:22.713299Z
......
[ThreadId(1)] => 2019-09-01T18:50:27.398560Z
[ThreadId(1)] => 2019-09-01T18:50:27.404358Z
[ThreadId(1)] => 2019-09-01T18:50:27.407224Z
[ThreadId(1)] => END: 4711

当たり前ですが、リクエスト処理は全部メインスレッド ThreadId(1) で順番に実行されてます。1000回リクエスは5秒近くかかりました。

  • 非同期リクエストの実現

1000回リクエストを10スレッドに捌けて実行されるサンプルコードです。

use failure::Fail;
use futures01::{future, Future};
use reqwest::r#async::Client as AsyncClient;
use reqwest::Url;
use std::thread;
use std::thread::ThreadId;
use std::time::{Duration, SystemTime};
use tokio_threadpool::{Builder, SpawnHandle};

fn main() {
    request_with_multi_thread();
}

fn request_with_multi_thread() {
    output1("START MULTI THREAD");
    let start_at = SystemTime::now();

    let client = AsyncClient::new();
    let thread_pool_size = 10;
    let thread_pool = Builder::new().pool_size(thread_pool_size).build();

    let mut handles = Vec::<SpawnHandle<(ThreadId, String), ErrorWrapper>>::new();
    while handles.iter().count() <= 1000 {
        let cloned_client = client.clone();
        handles.push(thread_pool.spawn_handle(future::lazy(move || {
            send_request_for_future(&cloned_client)
        })));
    }

    handles.iter_mut().for_each(|handle| match handle.wait() {
        Ok((thread_id, text)) => output2(thread_id, text.as_str()),
        Err(error) => output1(format!("{:?}", error).as_str()),
    });

    thread_pool.shutdown_now();

    let spent_time = start_at.elapsed().unwrap_or(Duration::new(0, 0));
    output1(format!("END: {}", spent_time.as_millis()).as_str())
}

fn send_request_for_future(
    client: &AsyncClient,
) -> impl Future<Item = (ThreadId, String), Error = ErrorWrapper> {
    client
        .get(Url::parse("http://localhost:9000/timestamp").unwrap())
        .send()
        .and_then(|mut response| response.text())
        .map(|text| (thread::current().id(), text))
        .from_err()
}

fn output2(thread_id: ThreadId, text: &str) {
    println!("[{:?}] => {}", thread_id, text);
}

fn output1(text: &str) {
    output2(thread::current().id(), text);
}
  • エラーハンドリングに関するコードは省略しました。

  • このサンプルでは 10スレッド で 1000回リクエストを捌けようとしてます。

  • タスク Future(send_request_for_futureの戻り値) をキューに溜まって、スレッドプール tokio_threadpool にアイドルなスレッドがあれば、キューからタスクを取り出して実行する。
    (行:22~28)

  • その結果達 SpawnHandle をメインスレッドでまとめる。
    (行:30~33)。

  • 実行結果
[ThreadId(1)] => START MULTI THREAD
[ThreadId(10)] => 2019-09-01T19:02:53.924756Z
[ThreadId(6)] => 2019-09-01T19:02:53.930478Z
[ThreadId(11)] => 2019-09-01T19:02:53.929056Z
[ThreadId(11)] => 2019-09-01T19:02:53.934684Z
[ThreadId(13)] => 2019-09-01T19:02:53.932217Z
[ThreadId(7)] => 2019-09-01T19:02:53.918978Z
[ThreadId(7)] => 2019-09-01T19:02:53.921761Z
......
[ThreadId(10)] => 2019-09-01T19:02:54.454774Z
[ThreadId(14)] => 2019-09-01T19:02:54.456831Z
[ThreadId(15)] => 2019-09-01T19:02:54.458395Z
[ThreadId(15)] => 2019-09-01T19:02:54.459974Z
[ThreadId(1)] => END: 593

リクエスト処理は全部メインスレッド以外のスレッドで行われて、かかった時間もほぼ直列の1/8です。

  • ハマったこと

1. HttpClientが違います。

reqwest::Client をFutureの中で使うと下記のエラーが発生します。

[ThreadId(1)] => HttpRequestError { error: Error(BlockingClientInFutureContext, "http://localhost:9000/timestamp") }

Futureの中では reqwest::r#async::Client を使わないといけないです。

2. reqwest::r#async::Client をマルチスレッド間で共有しないと、大量リクエストが発生したらリソースが枯渇してしまいます。
        let cloned_client = client.clone();
        handles.push(thread_pool.spawn_handle(future::lazy(move || {
            send_request_for_future(&cloned_client)
        })));

この部分を下記のように変えたら

        handles.push(thread_pool.spawn_handle(future::lazy(move || {
            send_request_for_future(&AsyncClient::new())
        })));

下記のエラーが大量に出てきます。(リクエストが全部失敗ではないですが)

[ThreadId(1)] => HttpRequestError { error: Error(Hyper(Error(Io, Os { code: 54, kind: ConnectionReset, message: "Connection reset by peer" })), "http://localhost:9000/timestamp") }

リクエスト先のサーバからコネクションを勝手に切断したためエラーになったと思います。
reqwest::r#async::Client のソースを見てみると、確かにClientには connection pool を持っています。
並列で数多くClientを生成すると、余計にリクエスト先のサーバとコネクションを確立し過ぎで、途中で切断されることになります。

/// An asynchronous `Client` to make Requests with.
///
/// The Client has various configuration values to tweak, but the defaults
/// are set to what is usually the most commonly desired value. To configure a
/// `Client`, use `Client::builder()`.
///
/// The `Client` holds a connection pool internally, so it is advised that
/// you create one and **reuse** it.
#[derive(Clone)]
pub struct Client {
    inner: Arc<ClientRef>,
}
3. Future.wait() を呼び出すときは、そのFutureは 別のスレッドで実行される保証 があるかどうかを考えないといけないです。

最初 send_request_for_future() は下記のように実装していました。

fn send_request_for_future(client: &AsyncClient) -> ResultWithError<(ThreadId, String)> {
    Ok((
        thread::current().id(),
        client
            .get(Url::parse("http://localhost:9000/timestamp").unwrap())
            .send()
            .wait()?
            .text()
            .wait()?,
    ))
}

reqwest::r#async::Client は使っていますが、Futureを返せずに実際の処理結果を返そうとしていました。.send().text() はそれぞれFutureを返すので、実際の処理結果を取り出すために .wait()? を2回呼び出しました。その結果、実行した直後に [ThreadId(1)] => START MULTI THREAD だけが出力され、その後は一切処理が進まなかったです。
原因は send_request_for_future() はthread_pool中のスレッドから実行されますが、そこで呼び出している2回の .wait()? は更に他のスレッドに実行してくれないと、処理が進まないです。thread_pool中のスレッドが全部 send_request_for_future() の実行に使ってしまったら、処理全体が進まなくなります。もしthread_pool_sizeがリクエスト回数より多い場合(この例だと let thread_pool_size = 1001; )はすべてのリクエストが正常に終わります。
send_request_for_future() がFutureを返すし、メインスレッドで SpawnHandle.wait() を呼び出すように実装したら、処理全体が正常に終えました。

  • まとめ

ほぼどの技術のハマりも同じだと思います。本質を理解したら当たり前のように見えますが、理解する前には詳しい人に聞いたり、ググったりしてバタバタしても中々解決しないとイライラしてしまいますね。今回のハマリポイント、特に3点目は色々ググっても類似のサンプルソースがなかったので、困りました。もし同じ問題に困っている方がいらっしゃったら、この記事が参考にできるならば嬉しいです。
ここで使っているサンプルソースは下記のプロジェクトからcloneできます。

https://github.com/dimmy82/rust-concurrency-request

  • (おまけ)async / await

今年の7月ぐらい、新しい非同期シンタックス async / await がリリースされて、ちょっと触ってみたので共有させていただきます。もし間違っているところがあればご指摘ください。

use failure::Fail;
use futures::executor::block_on;
use reqwest::{Client, Url};
use std::thread;
use std::thread::ThreadId;
use std::time::{Duration, SystemTime};

fn main() {
    block_on(request_with_async_await()); // this is not a parallel request example
}

async fn request_with_async_await() {
    output1("START ASYNC AWAIT");
    let start_at = SystemTime::now();

    let client = Client::new();
    let future1 = send_request_async(&client);
    let future2 = send_request_async(&client);
    match future1.await {
        Ok((thread_id, text)) => output2(thread_id, text.as_str()),
        Err(error) => output1(format!("{:?}", error).as_str()),
    };
    match future2.await {
        Ok((thread_id, text)) => output2(thread_id, text.as_str()),
        Err(error) => output1(format!("{:?}", error).as_str()),
    };

    let spent_time = start_at.elapsed().unwrap_or(Duration::new(0, 0));
    output1(format!("END: {}", spent_time.as_millis()).as_str())
}

async fn send_request_async(client: &Client) -> ResultWithError<(ThreadId, String)> {
    let mut response = client
        .get(Url::parse("http://localhost:9000/timestamp").unwrap())
        .send()?;
    Ok((thread::current().id(), response.text()?))
}

fn output2(thread_id: ThreadId, text: &str) {
    println!("[{:?}] => {}", thread_id, text);
}

fn output1(text: &str) {
    output2(thread::current().id(), text);
}

結論から言うと、上記サンプルのように

let future1 = /* async 関数を呼び出す */;
let future2 = /* async 関数を呼び出す */;
future1.await;
future2.await;

という書き方は非同期にならず、同期処理になります。 async 関数は await で呼び出されたタイミングで実行される からです。

  • 実行結果
[ThreadId(1)] => START ASYNC AWAIT
[ThreadId(1)] => 2019-09-01T21:23:12.791994Z
[ThreadId(1)] => 2019-09-01T21:23:12.794254Z
[ThreadId(1)] => END: 8

async 関数は処理結果(今回の例だと ResultWithError<(ThreadId, String)> )を返すように実装しますが、実際future1の型は impl core::future::Future<Output=ResultWithError<(ThreadId, String)>> になります。なので、もし非同期処理にしたければ、future1, future2を thread_pool.spawn_handle 的な関数に渡して実行してもらうじゃないかなと思います。
ただ、ここの core::future::Future は最近stdに追加されたFuture Traitですが、 futures01::Future とは全然違うものです。なので、使っているthread_poolのバージョンはまだ対応していないようです。
今後 core::future::Future に関する使い方が分かったらまた更新します。

以上、よろしくお願いいたします。

*1:The Book: Rustの創始者を含むコニュニティの方々が執筆するRust仕様の本です。去年までは日本語版が古くて、google先生に助けもらいながら無理やり英語版を読んでましたが、最近翻訳の皆さんのおかげて英語版とほぼ同期する 日本語版 が出てきましたので、Rustの勉強ならばそっちが一番おすすめです。