Spring WebFluxを利用したリアクティブプログラミング入門

アソビューAdvent Calendar 2022の3日目の記事です。

こんにちは、バックエンドの開発を担当しているけんすーです。
今回はアソビュー内のAPI Gatewayで利用しているSpring WebFluxについて紹介したいと思います。

はじめに

Spring WebFluxとは

Spring5から追加された機能で、ノンブロッキングな処理を実行できます。 従来のSpring MVCではリクエストの応答が返ってくるまでスレッドを占有し続けてしまい、別の処理を実行する度に新しいスレッドを生成する必要がありました。

一方で、Spring WebFluxではスレッドを占有することなく、そのスレッドを使い回して処理を実行できるため、 少ないスレッドでリクエストを大量に捌くことができます。そのため、API Gatewayのような複数のサービスを集約し、トラフィックの増加が見込まれるレイヤーにおいて重要な機能となっています。

サンプル実装

題材

今回はユーザが閲覧したプランの情報を取得するAPIを作成し、WebFluxを利用した順次処理や並列処理の実装方法について解説していきます。 アソビューに掲載されているプランはアクティビティとチケットの2つのタイプに分かれているため、各APIを呼び出して実装することにします。

実行環境

  • Mac Book Pro 13.0.1
  • Intellij IDEA 2021.3.3 (Ultimate Edition)
  • Java11
  • Spring Boot 2.7.5

事前準備

まずはSpring Initializrに沿ってWeb->Spring Reactive Webを追加します。
build.gradle

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'io.projectreactor:reactor-test'
}

今回はユーザとプラン(アクティビティとチケット)情報を取得するため、以下のモデルを使用しています。

@Data
@Builder
public class User {
    private List<String> activityCodes;
    private List<String> ticketCodes;
}

@Data
@Builder
public class Activity {
    private String activityCode;
    private String title;
    private String imageUrl;
}

@Data
@Builder
public class Ticket {
    private String ticketCode;
    private String title;
    private String imageUrl;
}

@Data
@Builder
public class ViewedHistory {
    private List<Activity> activities;
    private List<Ticket> tickets;
}

ルーティング

続いてAPIのルーティングを記述します。
Spring WebFluxでは従来のSpringMVCのアノテーション付きコントローラーのような書き方もできるのですが、今回は関数型プログラミングのように記述することが可能なRouterFunctionを利用していきます。

WebFluxSampleRouting.java

@Component
public class WebFluxSampleRouting {
    @Bean
    public RouterFunction<ServerResponse> routes(UserHandler handler) {
        return RouterFunctions.route()
                .POST("/viewedHistory", handler::listViewedHistory)
                .build();
    }
}

routers内で実際に処理を行うhandlerを引数に渡し、リクエストに対して実行したい関数をRouterFunction.route()に追加してきます。
今回の場合はPOSTメソッドでuserIdを受け取る想定でルーティングを定義しました。

API呼び出し

続いて各APIにリクエストする処理をServiceクラスに実装します。
従来のSpring MVCではRestTemplateを利用していましたが、ノンブロッキングな処理を行うことができるWebClientを利用します。

@Service
public class UserService {
    private final WebClient webClient = WebClient.create();

    public Mono<User> findByUserId(String userId) {
        return webClient.get()
                .uri("/users/{userId}" + userId)
                .retrieve()
                .bodyToMono(User.class);
    }
}

@Service
public class ActivityService {
    private final WebClient webClient = WebClient.create();

    public Flux<Plan> findAll(List<String> activityCodes) {
        return webClient.post()
                .uri("/activity")
                .bodyValue(activityCodes)
                .retrieve()
                .bodyToFlux(Activity.class);
    }
}

@Service
public class TicketService {
    private final WebClient webClient = WebClient.create();

    public Flux<Ticket> findAll(List<String> ticketCodes) {
        return webClient.post()
                .uri("/ticket")
                .bodyValue(ticketCodes)
                .retrieve()
                .bodyToFlux(Ticket.class);
    }
}

ReactorクラスにはMono型とFlux型があり、Reactive streams仕様に沿ったPublisherインタフェースを実装しています。

  • Mono型は0...1の要素を処理: 1つの戻り値を表す
  • Flux型はN個の要素を処理: Listのような複数の戻り値を表す

最後にRouterFunctionで定義したHandler内で上記のService層で定義した処理を実行します。

UserHandler.java

@AllArgsConstructor
@Component
public class UserHandler {
    UserService userService;
    ActivityService activityService;
    TicketService ticketService;

    Mono<ServerResponse> listViewedHistory(ServerRequest serverRequest) {
        return serverRequest.bodyToMono(String.class)
                .flatMap(userId -> userService.findByUserId(userId))
                .flatMap(user ->
                        Mono.zip(
                                activityService.findAll(user.getCodes()).collectList(), // Flux→Monoに変換
                                ticketService.findAll(user.getCodes()).collectList() // Flux→Monoに変換
                        )
                )
                .flatMap(aggregation -> {
                    var activities = aggregation.getT1();
                    var tickets = aggregation.getT2();
                    var viewHistory = ViewedHistory.builder().activities(activities).tickets(tickets).build();
                    return ServerResponse.ok()
                            .body(viewHistory, ViewedHistory.class);
                })
                .switchIfEmpty(ServerResponse.notFound().build());
    }
}

リクエストからユーザ情報を取得し、ユーザ情報を利用してアクティビティ、チケット情報を並列処理で取得後、レスポンスに変換しています。
ここで注目したいのはflatMap()Mono.zip()関数です。
MonoまたはFluxを返すような順次処理を実行する場合はflatMap()を使用します。
また、並列で処理を実行したい場合はMono.zip()を使用します。Mono.zip(処理1, 処理2)で指定したものをaggregation.getT1(), aggregation.getT2()でそれぞれ取得できます。
補足ですが、Mono.zipに関しては最大8つまで並列処理を実行することができます。

まとめ

Spring WebFluxを利用するには、非同期プログラミングやリアクティブプログラミングへの慣れが必要で少しとっつきづらい印象ですが、パフォーマンス観点でとても重要な機能なので業務でJavaやSpringFrameworkを利用されている方はぜひ一度触ってみてはいかがでしょうか。

おわりに

アソビューでの開発についてもっと知りたい方はこちらをぜひご覧ください。
カジュアル面談もあります。

www.asoview.com