使用 Kotlin + WebFlux/RxJava 2 實現響應式以及嘗試正式版本的協程

在前一篇文章 ofollow,noindex">《使用 Kotlin + Spring Boot 進行後端開發》 中,曾介紹過嘗試使用 Kotlin 來做後端開發。這一次,嘗試 WebFlux 以及協程。
首先,在build.gradle中新增外掛和依賴的庫。
plugins { id 'java' id 'org.jetbrains.kotlin.jvm' version '1.3.10' id "org.jetbrains.kotlin.plugin.allopen" version "1.3.10" } ext { libraries = [ rxjava: "2.2.2", logback: "1.2.3", spring_boot: "2.1.0.RELEASE", kotlinx_coroutines_core: "1.0.1" ] } group 'com.kotlin.tutorial' version '1.0-SNAPSHOT' sourceCompatibility = 1.8 def libs = rootProject.ext.libraries // 庫 repositories { mavenCentral() } dependencies { compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8" compile "org.jetbrains.kotlin:kotlin-reflect:1.3.10" testCompile group: 'junit', name: 'junit', version: '4.12' implementation "io.reactivex.rxjava2:rxjava:${libs.rxjava}" implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:${libs.kotlinx_coroutines_core}" implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:${libs.kotlinx_coroutines_core}" implementation "ch.qos.logback:logback-classic:${libs.logback}" implementation "ch.qos.logback:logback-core:${libs.logback}" implementation "ch.qos.logback:logback-access:${libs.logback}" implementation "org.springframework.boot:spring-boot-starter-web:${libs.spring_boot}" implementation "org.springframework.boot:spring-boot-starter-data-mongodb-reactive:${libs.spring_boot}" } compileKotlin { kotlinOptions.jvmTarget = "1.8" } compileTestKotlin { kotlinOptions.jvmTarget = "1.8" } 複製程式碼
此次,使用了 allopen 外掛。它是官方提供的外掛詳見: kotlinlang.org/docs/refere…
Kotlin 的類預設是final的,一般需要使用 open
關鍵字。使用了allopen外掛就可以節省 open
關鍵字。值得注意的是,需要開啟 Intellij 的 Enable annotation processing 選項。
這樣,建立 SpringKotlinApplication 就不需要使用 open
:
import org.springframework.boot.SpringApplication import org.springframework.boot.autoconfigure.SpringBootApplication /** * Created by tony on 2018/11/13. */ @SpringBootApplication class SpringKotlinApplication fun main(args: Array<String>) { SpringApplication.run(SpringKotlinApplication::class.java, *args) } 複製程式碼
另外,不要忘記配置資料庫的資訊,例子採用的是 MongoDB。
WebFlux
WebFlux 是 Spring 5 新增的特性,相對於傳統 MVC 的同步阻塞IO模型,它採用非同步非阻塞的IO模型。
WebFlux 的 Flux 取自於 Reactor 中的類 Flux。Reactor 是 Spring 5 響應式開發的基礎。
Reactor 是完全基於響應式流規範設計和實現的庫,Flux 和 Mono 是 Reactor 中的兩個基本概念。
Flux 類似 RxJava 的 Observable,它可以觸發零到多個事件,並根據實際情況結束處理或觸發錯誤。Mono 最多隻觸發一個事件,它跟 RxJava 的 Single 和 Maybe 類似,所以可以把 Mono 用於在非同步任務完成時發出通知。
1.1 建立 Model
首先,建立幾個 Model 類。
User 表示使用者物件。
import org.springframework.data.annotation.Id /** * Created by tony on 2018/11/22. */ data class User(@Id val id: String? = null, val name: String, val age: Int, val address: Address) { constructor() : this(null, "", 0, Address()) constructor(name: String, age: Int, address: Address) : this(null, name = name, age = age, address = address) } 複製程式碼
Address 記錄使用者的地址。
import org.springframework.data.annotation.Id /** * Created by tony on 2018/11/22. */ data class Address(@Id val id: String? = null, val number: Int, val street: String, val city: String) { constructor() : this(null, 0, "", "") constructor(number: Int, street: String, city: String) : this(null, number, street, city) } 複製程式碼
Audit 用於記錄使用者操作的時間。
import org.springframework.data.annotation.Id import java.time.LocalDateTime /** * Created by tony on 2018/11/22. */ data class Audit(@Id val id: String? = null, val name: String, val eventDate: LocalDateTime) { constructor() : this(null, "",LocalDateTime.now()) constructor(name: String, eventDate: LocalDateTime) : this(null, name, eventDate) } 複製程式碼
1.2 建立 Repository
建立 UserReactiveRepository 用於 User 物件的查詢操作,它實現 ReactiveMongoRepository 介面。
import com.kotlin.tutorial.model.User import org.springframework.data.mongodb.repository.ReactiveMongoRepository import org.springframework.stereotype.Repository import reactor.core.publisher.Flux /** * Created by tony on 2018/11/22. */ @Repository interface UserReactiveRepository : ReactiveMongoRepository<User, String> { fun findUserByAge(age: Int): Flux<User> fun findUserByAddressCity(city: String): Flux<User> fun findUserByAgeAndAddressCity(age: Int, city: String): Flux<User> } 複製程式碼
建立 AuditRepository 用於查詢使用者最近一條的操作時間。
import com.kotlin.tutorial.model.Audit import org.springframework.data.repository.CrudRepository import org.springframework.stereotype.Repository /** * Created by tony on 2018/11/22. */ @Repository interface AuditRepository: CrudRepository<Audit, String> { fun findFirstByNameOrderByEventDateDesc(name: String): Audit } 複製程式碼
1.3 建立 Service
建立 UserReactiveService,通過依賴注入了 userRepository、auditRepository。
import com.kotlin.tutorial.Utils.toLower import com.kotlin.tutorial.model.Address import com.kotlin.tutorial.model.Audit import com.kotlin.tutorial.model.User import com.kotlin.tutorial.repository.AuditRepository import com.kotlin.tutorial.repository.UserReactiveRepository import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Component import reactor.core.publisher.Flux import java.time.LocalDateTime /** * Created by tony on 2018/11/22. */ @Component class UserReactiveService { @Autowired lateinit var userRepository: UserReactiveRepository @Autowired lateinit var auditRepository: AuditRepository companion object { val cities = listOf("Shanghai", "Suzhou", "Hangzhou").toLower() val streets = listOf("renming road", "zhongshan road").toLower() } fun find(age: Int?, rawCity: String?): Flux<User> { val city = rawCity?.toLowerCase() return when { age is Int && city is String -> userRepository.findUserByAgeAndAddressCity(age, city) city is String -> userRepository.findUserByAddressCity(city) age is Int -> userRepository.findUserByAge(age) else -> userRepository.findAll() } } fun generateData(): Flux<User> { val list = listOf(20, 25, 33, 28, 34).map { val u = generate(it) auditRepository.save(Audit(u.name, LocalDateTime.now())) u } return userRepository.deleteAll().thenMany(userRepository.saveAll(list)) } private fun generate(age: Int): User { val address = Address(age, streets[age % streets.size], cities[age % cities.size]) return User("Tony$age", age, address) } } 複製程式碼
1.4 建立 Controller
建立 UserController 編寫兩個 reactive 的介面:
@RestController @RequestMapping("/user") class UserController { @Autowired lateinit var userReactiveService: UserReactiveService @GetMapping("/reactive/find") fun findByReactive(@RequestParam age: Int?, @RequestParam city: String?) = userReactiveService.find(age, city) @GetMapping("/reactive/generate") fun genDataByReactive() = userReactiveService.generateData() ...... } 複製程式碼
建立使用者的方式:
curl http://localhost:8080/user/reactive/generate 複製程式碼
基於城市查詢使用者的方式:
curl http://localhost:8080/user/reactive/find?city=suzhou 複製程式碼
RxJava 2
RxJava 庫是 JVM 上響應式程式設計的先驅,也是響應式流規範(Reactive Streams)的基礎。
如果對 RxJava 2 不熟悉,也可以購買我的《RxJava 2.x 實戰》
2.1 建立 Repository
建立 UserRxJavaRepository 功能跟 UserReactiveRepository 一樣,只是多了一個 findUserByName() 方法。
import com.kotlin.tutorial.model.User import io.reactivex.Flowable import org.springframework.data.repository.reactive.RxJava2CrudRepository import org.springframework.stereotype.Repository /** * Created by tony on 2018/11/22. */ @Repository interface UserRxJavaRepository : RxJava2CrudRepository<User, String> { fun findUserByName(name: String): Flowable<User> fun findUserByAge(age: Int): Flowable<User> fun findUserByAddressCity(city: String): Flowable<User> fun findUserByAgeAndAddressCity(age: Int, city: String): Flowable<User> } 複製程式碼
2.2 建立 JavaService
建立 UserRxJavaService ,類似於 UserReactiveService。但是,多了兩個方法:findByName()、login()。其中,呼叫 login() 會新增一條審計的記錄。
import com.kotlin.tutorial.Utils.toLower import com.kotlin.tutorial.model.Address import com.kotlin.tutorial.model.Audit import com.kotlin.tutorial.model.User import com.kotlin.tutorial.repository.AuditRepository import com.kotlin.tutorial.repository.UserRxJavaRepository import io.reactivex.Flowable import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Component import java.time.LocalDateTime /** * Created by tony on 2018/11/22. */ @Component class UserRxJavaService { @Autowired lateinit var userRepository: UserRxJavaRepository @Autowired lateinit var auditRepository: AuditRepository companion object { val cities = listOf("Shanghai", "Suzhou", "Hangzhou").toLower() val streets = listOf("renming road", "zhongshan road").toLower() } fun findByName(name: String): Flowable<User> = userRepository.findUserByName(name) fun find(age: Int?, rawCity: String?): Flowable<User> { val city = rawCity?.toLowerCase() return when { age is Int && city is String -> userRepository.findUserByAgeAndAddressCity(age, city) city is String -> userRepository.findUserByAddressCity(city) age is Int -> userRepository.findUserByAge(age) else -> userRepository.findAll() } } fun generateData(): Flowable<User> { val list = listOf(20, 25, 33, 28, 34).map { val u = generate(it) auditRepository.save(Audit(u.name, LocalDateTime.now())) u } return userRepository.deleteAll().andThen(userRepository.saveAll(list)) } private fun generate(age: Int): User { val address = Address(age, streets[age % streets.size], cities[age % cities.size]) return User("Tony$age", age, address) } fun login(name: String) = userRepository.findUserByName(name) .map { auditRepository.save(Audit(it.name, LocalDateTime.now())) } } 複製程式碼
2.3 建立 Controller
在原有的 UserController 中新增兩個 rxjava 的介面:
@RestController @RequestMapping("/user") class UserController { @Autowired lateinit var userRxJavaService: UserRxJavaService @GetMapping("/rxjava/find") fun findByRx(@RequestParam age: Int?, @RequestParam city: String?) = userRxJavaService.find(age, city) @GetMapping("/rxjava/generate") fun genDateByRx() = userRxJavaService.generateData() ...... } 複製程式碼
Kotlin 1.3 的 Coroutines
協程(coroutine)相比於執行緒更加輕量級,協程又稱為微執行緒。執行緒和協程的一個顯著區別是,執行緒的阻塞代價是昂貴的,而協程使用了更簡單、代價更小的掛起(suspend)來代替阻塞。
Coroutines 是 Kotlin 1.1 增加的實驗的功能,到 Kotlin 1.3 已經變成了正式的功能。
先在 UserController 建立一個模擬登陸的介面,訪問該介面時會新增一條審計的記錄
@GetMapping("/rxjava/login") fun mockLogin(@RequestParam username: String) = userRxJavaService.login(username) 複製程式碼
然後嘗試用傳統的 blocking 方式來編寫一個獲取登陸資訊的介面:
@GetMapping("/blocking/{username}") fun getNormalLoginMessage(@PathVariable username: String):String { val user = userService.findByName(username) val lastLoginTime = auditService.findByName(user.name).eventDate return "Hi ${user.name}, you have logged in since $lastLoginTime" } 複製程式碼
再嘗試用 RxJava 的方式來編寫該介面:
@GetMapping("/rxjava/{username}") fun getRxLoginMessage(@PathVariable username: String)= userRxJavaService.findByName(username) .map { auditService.findByName(it.name).eventDate } .map { "Hi ${username}, you have logged in since $it" } 複製程式碼
最後,使用 Coroutines 的方式來編寫介面:
@GetMapping("/coroutine/{username}") fun getLoginMessage(@PathVariable username: String) = runBlocking { val user = userRxJavaService.findByName(username).awaitSingle() val lastLoginTime = GlobalScope.async { auditService.findByName(user.name).eventDate }.await() "Hi ${user.name}, you have logged in since $lastLoginTime" } 複製程式碼
可以看到,使用協程的方式類似於傳統的 blocking 的方式來編寫程式碼。


關於協程,更多可以參考之前寫的 Coroutines 筆記:
Kotlin Coroutines 筆記 (一) 、 Kotlin Coroutines 筆記 (二)
雖然 Kotlin 1.3 之後有些變動,但是大體是不變的。之後,也會整理更多 Kotlin Coroutines 筆記。
總結
響應式開發是未來的趨勢,無論是服務端開發還是移動端開發,都會順應這個趨勢。
另外,Kotlin 1.3 之後的協程已經是正式版本,Kotlin 在語言級別上支援了協程,它是非同步程式設計的另一個不錯的選擇。
本文 demo 的 github 地址: github.com/fengzhizi71…