はじめに
こんにちは、SaaS Product Teamのヒロオカです。
SPEEDAではSpring Webfluxの採用が行われおり、一部リアクティブなシステムが動いています。
今回は、R2DBCという、リアクティブな非同期でRDBにするための仕様とSpring(Reactor Project) による実装およびサポートを利用して、APIの実装を試してみたいと思います。
R2DBとは
前述していますが、R2DBCはRDBアクセスに対して、リアクティブな非同期APIを提供するための仕様です。 以下のような特徴を持っています。
- Reactive Streams specificationに基づいている
- リアクティブなAPIを利用してRDBにアクセスできる
- 1コネクション1Threadのモデルより、スケーラブルである
- R2DBCはオープンな仕様であり、Service Provider Interface(SPI)を定義している
ちなみにR2DBCはReactive Relational Database Connectivity
の略です。
リアクティブシステムにおけるJDBCの問題点
じゃあ、今までのJDBCだとどういった点が問題なのかというところに関してですが、大きく2つの問題点があります。
- そもそも、JDBCはブロッキングAPI
- トランザクションの情報をThreadLocalに保存してしまう
まず前者に関してですが、言葉通りでJDBCはそもそもブロッキングです。1コネクションに対して1Threadを割り当てるモデルを採用しています。リアクティブシステムのなかでJDBCを利用すると、EventLoop等のThreadをブロックして、リクエストがハングしてしまうと言ったようなことが起こります。Mono.fromCallable等をつかって、ブロッキングの処理を切り離すなどは、やろうと思って思えばできないことは無いと思います。とはいえ、リアクティブにシステムを構築しているメリットが減ってしまいます。
また、JDBCはトランザクションの情報をThreadLocalに保存します。コードが実行されるThreadがダイナミックに変わるリアクティブシステムに置いては、これは大きな問題となります。
こういった理由により、リアクティブシステムに置いてJDBCを利用することは、あまり得策ではないと思われます。
実際に使っていく
R2DBCについて簡単に紹介したところで、実際にコードを書いていって動かしてみたいと思います。
今回は、R2DBCとWebfluxを用いて、顧客情報を提供する簡単なWebAPIを実装してみたいと思います。
具体的に提供するAPIは以下の通りになります。
GET /customers
で顧客の一覧を取得できるGET /customers/{id}
で指定した顧客の情報を取得できるPOST /customers
で顧客の登録ができる
WebAPIはSpring Bootを使って作成します。
今回、DBはPostgresSQLを利用しますが、その起動はDockerを用いてやろうと思います。
環境
アプリケーションの実行環境は以下の通りです。
$ java --version openjdk 15 2020-09-15 OpenJDK Runtime Environment (build 15+36-1562) OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing) $ mvn --version Apache Maven 3.6.3 Maven home: /usr/share/maven Java version: 15, vendor: Oracle Corporation, runtime: /home/somebody/.sdkman/candidates/java/15-open Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-60-generic", arch: "amd64", family: "unix" $ docker version Client: Docker Engine - Community Version: 20.10.2 API version: 1.41 Go version: go1.13.15 Git commit: 2291f61 Built: Mon Dec 28 16:17:43 2020 OS/Arch: linux/amd64 Context: default Experimental: true Server: Docker Engine - Community Engine: Version: 20.10.2 API version: 1.41 (minimum version 1.12) Go version: go1.13.15 Git commit: 8891c58 Built: Mon Dec 28 16:15:19 2020 OS/Arch: linux/amd64 Experimental: true containerd: Version: 1.4.3 GitCommit: 269548fa27e0089a8b8278fc4fc781d7f65a939b runc: Version: 1.0.0-rc92 GitCommit: ff819c7e9184c13b7c2607fe6c30ae19403a7aff docker-init: Version: 0.19.0 GitCommit: de40ad0
事前準備
プロジェクトの作成
Spring Initializrを用いて作成します。
設定は以下の通りとなります。
R2DBC関連のものに関しては、Spring Data R2DBC
とPostgresSQL Driver
の依存を追加してやる必要があります。DriverはH2
やMySQL
など様々用意されています。その一覧に関しましてはここを参照してください。
DBの起動
Dockerを使ってPostgresSQLを起動しておきます。
$ docker run --rm --name postgres-r2dbc -p 5432:5432 -e POSTGRES_PASSWORD=password -d postgres
今回は試しに利用するだけなので、Volumeを当てたりするようなことはしません。
DBの作成だけ行っておきます。
$ docker exec -it postgres-r2dbc psql -U postgres psql (12.2 (Debian 12.2-2.pgdg100+1)) Type "help" for help. postgres=# CREATE DATABASE customers; CREATE DATABASE postgres=#
APIを実装していく
まずはテストから...と言いたいところですが、テストは今回の主眼では無いので割愛させていただきます。
また、設計の良し悪しに関しても一旦おいておいて、ハンドラーからリポジトリーを直接呼び出すようにします。
空ハンドラーの実装とルーティングの設定
まずは、空のハンドラーを実装します。
@Component public class CustomerHandler { // 顧客一覧を取得するハンドラー Mono<ServerResponse> findAll(ServerRequest request) { return ServerResponse.ok().build } // ID指定した顧客を受け取るハンドラー Mono<ServerResponse> find(ServerRequest request) { return ServerResponse.ok().build(); } // 顧客を登録するハンドラー Mono<ServerResponse> create(ServerRequest request) { return ServerResponse.status(HttpStatus.CREATED).build(); } }
ここでは、まだそれぞれのステータスコードを返すだけのハンドラーになっています。
WebfluxではMVCと同様にアノテーションベースのハンドラーの定義が行えますが。今回は、一覧性が高さを優先してRouterFunctionを使ってルーティングの設定を書いていきます。
ルーティングの設定はエントリーポイントであるR2dbcSampleApplication.class
に直接書きます。
@SpringBootApplication public class { public static void main(String[] args) { SpringApplication.run(R2dbcSampleApplication.class, args); } @Bean RouterFunction<ServerResponse> setUpEndPoints(CustomerHandler customerHandler) { return route() .nest(path("/customers"), builder -> builder .GET("/{id}", customerHandler::find) .GET(customerHandler::findAll) .POST(customerHandler::create) .filter((req, res) -> res.handle(req) .onErrorResume(CustomerNotFoundException.class, e -> ServerResponse.notFound().build()))).build(); } }
ここまでは、Webfluxの話なので細かい説明は割愛させていただきますが、今回作ることを想定している3つのエンドポイントとカスタマーが見つからなかったときのハンドリングをフィルターで書いています。(CustomerNotFoundExceptionはRuntimeExceptionを継承した自作のクラスです。ソースコードはここ)
リポジトリの実装
それでは、今回のメインであるリポジトリを実装していきたいと思います。
以下の流れで作って行きたいと思います。
- DBの諸々の設定
- BDの初期化処理の記述
- エンティティの作成
- ReactiveCrudRepositoryを継承したリポジトリインターフェースを作成
DBの諸々の設定
前述の通り、今回はSpring Bootを用いています。
本来であればConnectionFactoryやDatabaseClient等を自分で設定しなければなりませせんが、org.springframework.boot.autoconfigure.r2dbcやorg.springframework.boot.autoconfigure.data.r2dbc
がそのへんはよしなってくれているみたいなので、その設定をそのまま使いたいと思います。
カスタマイズなどをする際も、この辺を参考にすればできそうです。
アプリケーション固有の設定をapplication.properties
に書いてやる必要があります。Bootではspring.r2dbc.*
のプリフィクスで設定するプロパティのプールのサイズ、タイムアウトや接続情報を記述することができます。
今回は接続情報だけを以下の通り記述します。
spring.r2dbc.url=r2dbc:postgresql://localhost:5432/customers spring.r2dbc.username=postgres spring.r2dbc.password=password
基本的には今まで書いていたような設定と似たような感じにはなるのですがurlのr2dbc
の部分が今までjdbc
と書いていたと思いますので注意が必要です。
BDの初期化処理の記述
ドキュメントによると、データの初期化に関しては以下のコードをR2dbcSampleApplication.class
に追加することで行えそうです。
@Bean public ConnectionFactoryInitializer initializer(ConnectionFactory connectionFactory) { ConnectionFactoryInitializer initializer = new ConnectionFactoryInitializer(); initializer.setConnectionFactory(connectionFactory); CompositeDatabasePopulator populator = new CompositeDatabasePopulator(); populator.addPopulators(new ResourceDatabasePopulator(new ClassPathResource("./db-schema.sql"))); initializer.setDatabasePopulator(populator); return initializer; }
後述しますが、Spring Data R2DBCを使ってデータを操作する方法はいくつかあります。
上記のコードはそのうちの1つを使って初期化ようのsqlを流しています。
resources
配下に初期化用のdb-schema.sql
ファイルも用意しておきましょう。
drop table if exists customer; create table customer ( id varchar(36), user_name varchar(10), primary key (id) );
エンティティの作成
次に、DBのカラムを表現するエンティティを作成します。
public class Customer implements Persistable { public Customer() { } public Customer(String id, String userName, boolean isNew) { this.id = id; this.userName = userName; this.isNew = isNew; } @Id private String id; private String userName; @Transient private boolean isNew; // setterとgetterは省略 @Override public boolean isNew() { return isNew; } }
今回はユーザネームとIDを持つだけのシンプルな顧客情報を取り扱うことを想定します。
isNew
が気になるところではあるかもしれませんが、後述しますので、ここでは一旦無視していただいて大丈夫です。
ReactiveCrudRepositoryを継承したリポジトリインターフェースを作成
Spring Data R2DBCを利用してRDBにクエリを発行する方法には以下のようなものがあります
- ConnectionFactoryからConnectionを直接取得して利用する方法
- DatabaseClientを利用する方法
- R2dbcEntityTemplateを利用する方法
- ReactiveCrudRepositoryを継承したリポジトリインターフェースを作成する方法
今回はその4番目の方法をやってみたいと思います。
ReactiveCrudRepositoryってなんぞ?というところに関してですが。
Spring Dataはリポジトリの抽象化を用意してくれており、そのインターフェスを継承した独自のインターフェースを実装することで、いわゆるボイラープレートを削減することができます。身近そうな例で言うとJpaRepository で、このリポジトリを継承するインターフェースを作るとDBにCURDを行なうためのクラスを自動生成してくれます。
Spring Data R2DBCにおいては、その抽象化の一つにReactiveCrudRepositoryがあります。
今回の場合以下のようにインタフェースを作成します。
import org.springframework.data.repository.reactive.ReactiveCrudRepository; interface CustomerRepository extends ReactiveCrudRepository<Customer, String> { }
実は今回のユースケースに関しては、デフォルトで提供されるクラスだけを利用していれば問題ないのでこれでリポジトリの実装は終わりです。
今回のユースケースでは必要ありませんが、@QueryでのSpEL式のサポートしているので、もし新たにクエリを足す必要などがある場合は以下のように定義できます。
interface CustomerRepository extends ReactiveCrudRepository<Customer, String> { @Query("select * from customer c where c.user_name == $1") Mono<Customer> findByUserName(String userName); }
ハンドラーの修正
リポジトリができたのでそのリポジトリを用いて、ハンドラーを実装していきます。
@Component public class CustomerHandler { private final CustomerRepository customerRepository; public CustomerHandler(CustomerRepository customerRepository) { this.customerRepository = customerRepository; } // 顧客一覧を取得するハンドラー Mono<ServerResponse> findAll(ServerRequest request) { return ServerResponse.ok() .contentType(MediaType.APPLICATION_NDJSON) .body(customerRepository.findAll() .map(c -> new CustomerDto(c.getId(), c.getUserName())), CustomerDto.class); } // ID指定した顧客を受け取るハンドラー Mono<ServerResponse> find(ServerRequest request) { String id = request.pathVariable("id"); return customerRepository.findById(id) .map(c -> new CustomerDto(c.getId(), c.getUserName())) .flatMap(c -> ServerResponse.ok().body(Mono.just(c), CustomerDto.class)) .switchIfEmpty(Mono.error(new CustomerNotFoundException(String.format("Customer whose id %s is not found", id)))); } // 顧客を登録するハンドラー Mono<ServerResponse> create(ServerRequest request) { return request.bodyToMono(CustomerDto.class) .map(c -> new Customer(c.getId(), c.getName(), true)) .map(customerRepository::save) .flatMap(customer -> ServerResponse.status(HttpStatus.CREATED) .body(customer.map(c -> new CustomerDto(c.getId(), c.getUserName())), CustomerDto.class)); } }
ごちゃごちゃいろいろ書いてますが、外部APIコールの形を表現するConsumerDto(実装はこちら)をエンティティにマップしているだけです。
もう一つ注目する点として、先程、エンティティにisNew
の部分です。顧客登録のハンドラーを注目すると、この値にtureをセットしています。
これは、R2DBCリポジトリのエンティティ状態検出戦略
によるものでデフォルトではエンティティのIDがnullでない場合はエンティティはすでにDBに存在するものとして扱われてしまいます。エンティティのIDを外部のロジックを利用して生成したい場合は、その回避策としてPersistableを実装してisNew()
メソッドでエンティティが新しいかどうかを判定するようにしてやることができます。
今回は、入力として渡されるIDをそのままインサートしたかったので上記のようにisNew
プロパティを定義して、外部から新しいエンティティなのかどうかを制御できるようにしています。
これで、WebAPIの実装まで終わりました。
それではアプリケーションを起動して、その挙動を確認してみたいと思います。
# アプリケーションの起動 $ mvn spring-boot:run # 顧客の一覧を取得する # まだ、顧客はいないので空のリストが返ってくる $ curl -v localhost:8080/customers * Trying 127.0.0.1:8080... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > GET /customers HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.68.0 > Accept: */* > * Mark bundle as not supporting multiuse < HTTP/1.1 200 OK < transfer-encoding: chunked < Content-Type: application/x-ndjson < * Connection #0 to host localhost left intact # 存在しない顧客を取得する # 存在しない顧客のIDを指定しているので404が返ってくる $ curl -v localhost:8080/customers/notexist * Trying 127.0.0.1:8080... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > GET /customers/notexist HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.68.0 > Accept: */* > * Mark bundle as not supporting multiuse < HTTP/1.1 404 Not Found < content-length: 0 < * Connection #0 to host localhost left intact # 顧客を登録する $ curl -v -XPOST -d '{"id": "15c6895c-54ac-11eb-95a1-576ca336aec3", "name": "hirooka"}' --header "Content-type: application/json" localhost:8080/customers Note: Unnecessary use of -X or --request, POST is already inferred. * Trying 127.0.0.1:8080... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > POST /customers HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.68.0 > Accept: */* > Content-type: application/json > Content-Length: 65 > * upload completely sent off: 65 out of 65 bytes * Mark bundle as not supporting multiuse < HTTP/1.1 201 Created < Content-Type: application/json < Content-Length: 62 < * Connection #0 to host localhost left intact {"id":"15c6895c-54ac-11eb-95a1-576ca336aec3","name":"hirooka"} # 顧客を登録する $ curl -v -XPOST -d '{"id": "15c6895c-54ac-11eb-95a1-576ca336aec3", "name": "uzabase-1"}' --header "Content-type: application/json" localhost:8080/customers Note: Unnecessary use of -X or --request, POST is already inferred. * Trying 127.0.0.1:8080... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > POST /customers HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.68.0 > Accept: */* > Content-type: application/json > Content-Length: 67 > * upload completely sent off: 67 out of 67 bytes * Mark bundle as not supporting multiuse < HTTP/1.1 201 Created < Content-Type: application/json < Content-Length: 64 < * Connection #0 to host localhost left intact {"id":"15c6895c-54ac-11eb-95a1-576ca336aec3","name":"uzabase-1"} # 顧客の一覧を取得する # 先程作成したした顧客情報が返ってくる $ curl -v localhost:8080/customers * Trying 127.0.0.1:8080... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > GET /customers HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.68.0 > Accept: */* > * Mark bundle as not supporting multiuse < HTTP/1.1 200 OK < transfer-encoding: chunked < Content-Type: application/x-ndjson < {"id":"15c6895c-54ac-11eb-95a1-576ca336aec3","name":"uzabase-1"} {"id":"e077985c-54ad-11eb-a03c-9f7f02e269eb","name":"uzabase-2"} * Connection #0 to host localhost left intact # ID指定で顧客を取得する $ curl -v localhost:8080/customers/e077985c-54ad-11eb-a03c-9f7f02e269eb * Trying 127.0.0.1:8080... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > GET /customers/e077985c-54ad-11eb-a03c-9f7f02e269eb HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.68.0 > Accept: */* > * Mark bundle as not supporting multiuse < HTTP/1.1 200 OK < Content-Type: application/json < Content-Length: 64 < * Connection #0 to host localhost left intact {"id":"e077985c-54ad-11eb-a03c-9f7f02e269eb","name":"uzabase-2"}
これでAPIのSpring Data R2DBCを用いたWebAPIの作成ができました。
終わりに
はじめにでも記述しましたが、SPEEDAでは一部リアクティブの技術を使ってシステムを構築しています。しかし、今はまだ導入段階であり、その機能は限定的に使えている状態であると言った感じです。今後、R2DBCやRScoket等を導入したフルリアクティブなシステムを構築して行けるといいなぁと個人的な思いがあったりします。