Uzabase Tech Blog

SPEEDA, NewsPicks, FORCASなどを開発するユーザベースの技術チームブログです。

Spring Data R2DBCでリアクティブにDBアクセスを行なう

はじめに

こんにちは、SaaS Product Teamのヒロオカです。
SPEEDAではSpring Webfluxの採用が行われおり、一部リアクティブなシステムが動いています。

今回は、R2DBCという、リアクティブな非同期でRDBにするための仕様とSpring(Reactor Project) による実装およびサポートを利用して、APIの実装を試してみたいと思います。

R2DBとは

前述していますが、R2DBCはRDBアクセスに対して、リアクティブな非同期APIを提供するための仕様です。 以下のような特徴を持っています。

  • Reactive Streams specificationに基づいている
  • リアクティブなAPIを利用してRDBにアクセスできる
    • 1コネクション1Threadのモデルより、スケーラブルである
  • R2DBCはオープンな仕様であり、Service Provider Interface(SPI)を定義している

ちなみにR2DBCはReactive Relational Database Connectivity の略です。

リアクティブシステムにおけるJDBCの問題点

じゃあ、今までのJDBCだとどういった点が問題なのかというところに関してですが、大きく2つの問題点があります。

  • そもそも、JDBCはブロッキングAPI
  • トランザクションの情報をThreadLocalに保存してしまう

まず前者に関してですが、言葉通りでJDBCはそもそもブロッキングです。1コネクションに対して1Threadを割り当てるモデルを採用しています。リアクティブシステムのなかでJDBCを利用すると、EventLoop等のThreadをブロックして、リクエストがハングしてしまうと言ったようなことが起こります。Mono.fromCallable等をつかって、ブロッキングの処理を切り離すなどは、やろうと思って思えばできないことは無いと思います。とはいえ、リアクティブにシステムを構築しているメリットが減ってしまいます。

また、JDBCはトランザクションの情報をThreadLocalに保存します。コードが実行されるThreadがダイナミックに変わるリアクティブシステムに置いては、これは大きな問題となります。

こういった理由により、リアクティブシステムに置いてJDBCを利用することは、あまり得策ではないと思われます。

実際に使っていく

R2DBCについて簡単に紹介したところで、実際にコードを書いていって動かしてみたいと思います。 今回は、R2DBCとWebfluxを用いて、顧客情報を提供する簡単なWebAPIを実装してみたいと思います。
具体的に提供するAPIは以下の通りになります。

  • GET /customers で顧客の一覧を取得できる
  • GET /customers/{id} で指定した顧客の情報を取得できる
  • POST /customers で顧客の登録ができる

WebAPIはSpring Bootを使って作成します。
今回、DBはPostgresSQLを利用しますが、その起動はDockerを用いてやろうと思います。

環境

アプリケーションの実行環境は以下の通りです。

$ java --version
openjdk 15 2020-09-15
OpenJDK Runtime Environment (build 15+36-1562)
OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing)

$ mvn --version
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 15, vendor: Oracle Corporation, runtime: /home/somebody/.sdkman/candidates/java/15-open
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-60-generic", arch: "amd64", family: "unix"

$ docker version
Client: Docker Engine - Community
 Version:           20.10.2
 API version:       1.41
 Go version:        go1.13.15
 Git commit:        2291f61
 Built:             Mon Dec 28 16:17:43 2020
 OS/Arch:           linux/amd64
 Context:           default
 Experimental:      true

Server: Docker Engine - Community
 Engine:
  Version:          20.10.2
  API version:      1.41 (minimum version 1.12)
  Go version:       go1.13.15
  Git commit:       8891c58
  Built:            Mon Dec 28 16:15:19 2020
  OS/Arch:          linux/amd64
  Experimental:     true
 containerd:
  Version:          1.4.3
  GitCommit:        269548fa27e0089a8b8278fc4fc781d7f65a939b
 runc:
  Version:          1.0.0-rc92
  GitCommit:        ff819c7e9184c13b7c2607fe6c30ae19403a7aff
 docker-init:
  Version:          0.19.0
  GitCommit:        de40ad0

事前準備

プロジェクトの作成

Spring Initializrを用いて作成します。
設定は以下の通りとなります。

f:id:yuya_hirooka:20210112130255p:plain

R2DBC関連のものに関しては、Spring Data R2DBCPostgresSQL Driverの依存を追加してやる必要があります。DriverはH2MySQLなど様々用意されています。その一覧に関しましてはここを参照してください。

DBの起動

Dockerを使ってPostgresSQLを起動しておきます。

$ docker run  --rm --name postgres-r2dbc -p 5432:5432 -e POSTGRES_PASSWORD=password -d postgres

今回は試しに利用するだけなので、Volumeを当てたりするようなことはしません。
DBの作成だけ行っておきます。

$ docker exec -it postgres-r2dbc  psql -U postgres 
psql (12.2 (Debian 12.2-2.pgdg100+1))
Type "help" for help.

postgres=# CREATE DATABASE customers;
CREATE DATABASE
postgres=# 

APIを実装していく

まずはテストから...と言いたいところですが、テストは今回の主眼では無いので割愛させていただきます。
また、設計の良し悪しに関しても一旦おいておいて、ハンドラーからリポジトリーを直接呼び出すようにします。

空ハンドラーの実装とルーティングの設定

まずは、空のハンドラーを実装します。

@Component
public class CustomerHandler {

    // 顧客一覧を取得するハンドラー
    Mono<ServerResponse> findAll(ServerRequest request) {
        return ServerResponse.ok().build
    }

    // ID指定した顧客を受け取るハンドラー
    Mono<ServerResponse> find(ServerRequest request) {
        return ServerResponse.ok().build();
    }

    // 顧客を登録するハンドラー  
    Mono<ServerResponse> create(ServerRequest request) {
        return ServerResponse.status(HttpStatus.CREATED).build();
    }
}

ここでは、まだそれぞれのステータスコードを返すだけのハンドラーになっています。

WebfluxではMVCと同様にアノテーションベースのハンドラーの定義が行えますが。今回は、一覧性が高さを優先してRouterFunctionを使ってルーティングの設定を書いていきます。
ルーティングの設定はエントリーポイントであるR2dbcSampleApplication.classに直接書きます。

@SpringBootApplication
public class  {

    public static void main(String[] args) {
        SpringApplication.run(R2dbcSampleApplication.class, args);
    }

    @Bean
    RouterFunction<ServerResponse> setUpEndPoints(CustomerHandler customerHandler) {
        return route()
                .nest(path("/customers"),
                        builder -> builder
                                .GET("/{id}", customerHandler::find)
                                .GET(customerHandler::findAll)
                                .POST(customerHandler::create)
                                .filter((req, res) -> res.handle(req)
                                        .onErrorResume(CustomerNotFoundException.class,
                                                e -> ServerResponse.notFound().build()))).build();
    }
}

ここまでは、Webfluxの話なので細かい説明は割愛させていただきますが、今回作ることを想定している3つのエンドポイントとカスタマーが見つからなかったときのハンドリングをフィルターで書いています。(CustomerNotFoundExceptionはRuntimeExceptionを継承した自作のクラスです。ソースコードはここ)

リポジトリの実装

それでは、今回のメインであるリポジトリを実装していきたいと思います。
以下の流れで作って行きたいと思います。

  • DBの諸々の設定
  • BDの初期化処理の記述
  • エンティティの作成
  • ReactiveCrudRepositoryを継承したリポジトリインターフェースを作成

DBの諸々の設定

前述の通り、今回はSpring Bootを用いています。
本来であればConnectionFactoryやDatabaseClient等を自分で設定しなければなりませせんが、org.springframework.boot.autoconfigure.r2dbcorg.springframework.boot.autoconfigure.data.r2dbc がそのへんはよしなってくれているみたいなので、その設定をそのまま使いたいと思います。
カスタマイズなどをする際も、この辺を参考にすればできそうです。

アプリケーション固有の設定をapplication.propertiesに書いてやる必要があります。Bootではspring.r2dbc.*のプリフィクスで設定するプロパティのプールのサイズ、タイムアウトや接続情報を記述することができます。
今回は接続情報だけを以下の通り記述します。

spring.r2dbc.url=r2dbc:postgresql://localhost:5432/customers
spring.r2dbc.username=postgres
spring.r2dbc.password=password

基本的には今まで書いていたような設定と似たような感じにはなるのですがurlのr2dbcの部分が今までjdbcと書いていたと思いますので注意が必要です。

BDの初期化処理の記述

ドキュメントによると、データの初期化に関しては以下のコードをR2dbcSampleApplication.classに追加することで行えそうです。

    @Bean
    public ConnectionFactoryInitializer initializer(ConnectionFactory connectionFactory) {

        ConnectionFactoryInitializer initializer = new ConnectionFactoryInitializer();
        initializer.setConnectionFactory(connectionFactory);

        CompositeDatabasePopulator populator = new CompositeDatabasePopulator();
        populator.addPopulators(new ResourceDatabasePopulator(new ClassPathResource("./db-schema.sql")));
        initializer.setDatabasePopulator(populator);

        return initializer;
    }

後述しますが、Spring Data R2DBCを使ってデータを操作する方法はいくつかあります。
上記のコードはそのうちの1つを使って初期化ようのsqlを流しています。

resources配下に初期化用のdb-schema.sqlファイルも用意しておきましょう。

drop table if exists customer;

create table customer (
    id varchar(36),
    user_name varchar(10),
    primary key (id)
);

エンティティの作成

次に、DBのカラムを表現するエンティティを作成します。

public class Customer implements Persistable {

    public Customer() {
    }

    public Customer(String id, String userName, boolean isNew) {
        this.id = id;
        this.userName = userName;
        this.isNew = isNew;
    }

    @Id
    private String id;

    private String userName;

    @Transient
    private boolean isNew;

   // setterとgetterは省略

    @Override
    public boolean isNew() {
        return isNew;
    }
}

今回はユーザネームとIDを持つだけのシンプルな顧客情報を取り扱うことを想定します。
isNewが気になるところではあるかもしれませんが、後述しますので、ここでは一旦無視していただいて大丈夫です。

ReactiveCrudRepositoryを継承したリポジトリインターフェースを作成

Spring Data R2DBCを利用してRDBにクエリを発行する方法には以下のようなものがあります

今回はその4番目の方法をやってみたいと思います。

ReactiveCrudRepositoryってなんぞ?というところに関してですが。
Spring Dataはリポジトリの抽象化を用意してくれており、そのインターフェスを継承した独自のインターフェースを実装することで、いわゆるボイラープレートを削減することができます。身近そうな例で言うとJpaRepository で、このリポジトリを継承するインターフェースを作るとDBにCURDを行なうためのクラスを自動生成してくれます。
Spring Data R2DBCにおいては、その抽象化の一つにReactiveCrudRepositoryがあります。
今回の場合以下のようにインタフェースを作成します。

import org.springframework.data.repository.reactive.ReactiveCrudRepository;

interface CustomerRepository extends ReactiveCrudRepository<Customer, String> {
}

実は今回のユースケースに関しては、デフォルトで提供されるクラスだけを利用していれば問題ないのでこれでリポジトリの実装は終わりです。

今回のユースケースでは必要ありませんが、@QueryでのSpEL式のサポートしているので、もし新たにクエリを足す必要などがある場合は以下のように定義できます。

interface CustomerRepository extends ReactiveCrudRepository<Customer, String> {

    @Query("select * from customer c where c.user_name == $1")
    Mono<Customer> findByUserName(String userName);

}

ハンドラーの修正

リポジトリができたのでそのリポジトリを用いて、ハンドラーを実装していきます。

@Component
public class CustomerHandler {

    private final CustomerRepository customerRepository;

    public CustomerHandler(CustomerRepository customerRepository) {
        this.customerRepository = customerRepository;
    }

    // 顧客一覧を取得するハンドラー
    Mono<ServerResponse> findAll(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_NDJSON)
                .body(customerRepository.findAll()
                                .map(c -> new CustomerDto(c.getId(), c.getUserName())), CustomerDto.class);
    }

    // ID指定した顧客を受け取るハンドラー
    Mono<ServerResponse> find(ServerRequest request) {
        String id = request.pathVariable("id");
        return customerRepository.findById(id)
                .map(c -> new CustomerDto(c.getId(), c.getUserName()))
                .flatMap(c -> ServerResponse.ok().body(Mono.just(c), CustomerDto.class))
                .switchIfEmpty(Mono.error(new CustomerNotFoundException(String.format("Customer whose id %s is not found", id))));
    }

    // 顧客を登録するハンドラー 
    Mono<ServerResponse> create(ServerRequest request) {
        return request.bodyToMono(CustomerDto.class)
                .map(c -> new Customer(c.getId(), c.getName(), true))
                .map(customerRepository::save)
                .flatMap(customer ->
                        ServerResponse.status(HttpStatus.CREATED)
                                .body(customer.map(c -> new CustomerDto(c.getId(), c.getUserName())), CustomerDto.class));
    }
}

ごちゃごちゃいろいろ書いてますが、外部APIコールの形を表現するConsumerDto(実装はこちら)をエンティティにマップしているだけです。
もう一つ注目する点として、先程、エンティティにisNewの部分です。顧客登録のハンドラーを注目すると、この値にtureをセットしています。 これは、R2DBCリポジトリのエンティティ状態検出戦略 によるものでデフォルトではエンティティのIDがnullでない場合はエンティティはすでにDBに存在するものとして扱われてしまいます。エンティティのIDを外部のロジックを利用して生成したい場合は、その回避策としてPersistableを実装してisNew()メソッドでエンティティが新しいかどうかを判定するようにしてやることができます。
今回は、入力として渡されるIDをそのままインサートしたかったので上記のようにisNewプロパティを定義して、外部から新しいエンティティなのかどうかを制御できるようにしています。

これで、WebAPIの実装まで終わりました。
それではアプリケーションを起動して、その挙動を確認してみたいと思います。

# アプリケーションの起動
$ mvn spring-boot:run


# 顧客の一覧を取得する
# まだ、顧客はいないので空のリストが返ってくる
$ curl -v localhost:8080/customers
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /customers HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/x-ndjson
< 
* Connection #0 to host localhost left intact


# 存在しない顧客を取得する
# 存在しない顧客のIDを指定しているので404が返ってくる
$ curl -v localhost:8080/customers/notexist
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /customers/notexist HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 404 Not Found
< content-length: 0
< 
* Connection #0 to host localhost left intact

# 顧客を登録する
$ curl -v -XPOST -d '{"id": "15c6895c-54ac-11eb-95a1-576ca336aec3", "name": "hirooka"}' --header "Content-type: application/json"  localhost:8080/customers
Note: Unnecessary use of -X or --request, POST is already inferred.
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /customers HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> Content-type: application/json
> Content-Length: 65
> 
* upload completely sent off: 65 out of 65 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 201 Created
< Content-Type: application/json
< Content-Length: 62
< 
* Connection #0 to host localhost left intact
{"id":"15c6895c-54ac-11eb-95a1-576ca336aec3","name":"hirooka"}


# 顧客を登録する
$ curl -v -XPOST -d '{"id": "15c6895c-54ac-11eb-95a1-576ca336aec3", "name": "uzabase-1"}' --header "Content-type: application/json"  localhost:8080/customers
Note: Unnecessary use of -X or --request, POST is already inferred.
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /customers HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> Content-type: application/json
> Content-Length: 67
> 
* upload completely sent off: 67 out of 67 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 201 Created
< Content-Type: application/json
< Content-Length: 64
< 
* Connection #0 to host localhost left intact
{"id":"15c6895c-54ac-11eb-95a1-576ca336aec3","name":"uzabase-1"}

# 顧客の一覧を取得する
# 先程作成したした顧客情報が返ってくる
$ curl -v localhost:8080/customers
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /customers HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/x-ndjson
< 
{"id":"15c6895c-54ac-11eb-95a1-576ca336aec3","name":"uzabase-1"}
{"id":"e077985c-54ad-11eb-a03c-9f7f02e269eb","name":"uzabase-2"}
* Connection #0 to host localhost left intact

# ID指定で顧客を取得する  
$ curl -v localhost:8080/customers/e077985c-54ad-11eb-a03c-9f7f02e269eb
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /customers/e077985c-54ad-11eb-a03c-9f7f02e269eb HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Content-Type: application/json
< Content-Length: 64
< 
* Connection #0 to host localhost left intact
{"id":"e077985c-54ad-11eb-a03c-9f7f02e269eb","name":"uzabase-2"}

これでAPIのSpring Data R2DBCを用いたWebAPIの作成ができました。

終わりに

はじめにでも記述しましたが、SPEEDAでは一部リアクティブの技術を使ってシステムを構築しています。しかし、今はまだ導入段階であり、その機能は限定的に使えている状態であると言った感じです。今後、R2DBCやRScoket等を導入したフルリアクティブなシステムを構築して行けるといいなぁと個人的な思いがあったりします。

参考資料