使用Spring Data R2DBC進行非同步RDBMS訪問 - Lanky Dan Dev Blog
不久前,JDBC驅動程式的反應變體稱為R2DBC釋出了,它允許資料非同步流式傳輸到已訂閱它的任何端點,結合使用像R2DBC這樣的反應式驅動程式和Spring WebFlux,可以編寫一個完整的響應式應用程式來非同步進行資料的接收和傳送。在這篇文章中,我們將重點關注資料庫端:從連線到資料庫,然後最終儲存和檢索資料。
我們使用Spring Data實現資料庫端的反應式應用,與所有Spring Data模組一樣,它為我們提供了開箱即用的配置,可以減少我們需要編寫的樣板程式碼量以獲得我們的應用程式設定。最重要的是,它在資料庫驅動程式上提供了一個層,更容易編制任務變得更容易。
對於這篇文章的內容,我使用的是Postgres資料庫,當然,H2和Microsoft SQL Server都有自己的R2DBC驅動程式實現。
依賴
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-r2dbc</artifactId> <version>1.0.0.M1</version> </dependency> <dependency> <groupId>io.r2dbc</groupId> <artifactId>r2dbc-postgresql</artifactId> <version>1.0.0.M6</version> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> </dependencies> <repositories> <repository> <id>repository.spring.milestone</id> <name>Spring Milestone Repository</name> <url>http:<font><i>//repo.spring.io/milestone</url></i></font><font> </repository> </repositories> </font>
使用Spring Boot的次數越多,就越習慣於使用spring-boot-starter匯入單個依賴項。我希望會有spring-boot-starter-r2dbc依賴,但不幸的是,沒有這樣一個依賴。在編寫本文時,它沒有自己的Spring Boot模組,我相信未來會讓R2DBC驅動程式更容易設定。目前,我們需要手動填寫一些額外的依賴項。此外,R2DBC庫只有Milestone版本(更多證明它們是新的)所以我們需要確保引入Spring Milestone庫。
連線到資料庫
感謝Spring Data為我們做了很多工作,需要手動建立的唯一Bean ConnectionFactory包含資料庫的連線細節:
@Configuration @EnableR2dbcRepositories <b>class</b> DatabaseConfiguration( @Value(<font>"\${spring.data.postgres.host}"</font><font>) <b>private</b> val host: String, @Value(</font><font>"\${spring.data.postgres.port}"</font><font>) <b>private</b> val port: Int, @Value(</font><font>"\${spring.data.postgres.database}"</font><font>) <b>private</b> val database: String, @Value(</font><font>"\${spring.data.postgres.username}"</font><font>) <b>private</b> val username: String, @Value(</font><font>"\${spring.data.postgres.password}"</font><font>) <b>private</b> val password: String ) : AbstractR2dbcConfiguration() { override fun connectionFactory(): ConnectionFactory { <b>return</b> PostgresqlConnectionFactory( PostgresqlConnectionConfiguration.builder() .host(host) .port(port) .database(database) .username(username) .password(password).build() ) } } </font>
這裡要注意的第一件事是擴充套件AbstractR2dbcConfiguration。該類包含一堆我們不再需要手動建立的Bean。實現connectionFactory是類的唯一要求,因為建立DatabaseClientBean 需要它。
這種結構是Spring Data模組的典型結構,因此在嘗試不同的模組時會感覺非常熟悉。此外,我希望一旦自動配置可用,就可以刪除這個手動配置,並且只能通過自動配置application.properties驅動。
Spring可以連線到正在執行的Postgres例項的配置:
Postgres的port屬性預設值5432 ;host,database,username和password是PostgresqlConnectionFactory需要的定義,缺少一個會丟擲異常。
這個例子的最後一條值得注意的資訊是使用@EnableR2dbcRepositories。此註釋指示Spring查詢擴充套件Spring Repository介面的任何儲存庫介面。這用作檢測Spring Data儲存庫的基礎介面。我們將在下一節中進一步瞭解這一點。要從中獲取的主要資訊是您需要使用@EnableR2dbcRepositories註釋來充分利用Spring Data的功能。
建立Spring Data Repository
如上所述,在本節中,我們將介紹新增Spring Data Repository。這些儲存庫是Spring Data的一個很好的特性,這意味著您不需要編寫大量額外程式碼來編寫查詢。不幸的是,至少就目前而言,Spring R2DBC不能像其他Spring Data模組那樣進行推斷查詢(我相信這會在某些時候新增)。這意味著您需要使用@Query註釋並手動編寫SQL。讓我們來看看:
@Repository <b>interface</b> PersonRepository : R2dbcRepository<Person, Int> { @Query(<font>"SELECT * FROM people WHERE name = $1"</font><font>) fun findAllByName(name: String): Flux<Person> @Query(</font><font>"SELECT * FROM people WHERE age = $1"</font><font>) fun findAllByAge(age: Int): Flux<Person> } </font>
此介面擴充套件R2dbcRepository。又擴充套件了ReactiveCrudRepository,ReactiveCrudRepository提供標準的CRUD功能,據我所知,R2dbcRepository它不提供任何額外的功能,而是為更好的上下文命名而建立的介面。
R2dbcRepository接受兩個通用引數,一個是作為輸入並作為輸出生成的實體類。第二個是主鍵的型別。因此,在這種情況下,Person類由PersonRepository(有意義)管理,內部的主鍵欄位Person是Int。
在這個類中函式的返回型別是ReactiveCrudRepository提供的Flux和Mono,這些是Spring使用的Project Reactor型別,作為預設的Reactive Stream型別。Flux表示多個元素的流,而 Mono表示單個結果。
最後,正如我之前在示例中提到的,每個函式都使用註釋@Query。語法非常簡單,SQL是註釋中的一個字串。$1($2,$3等...更多輸入)表示輸入到函式的值。完成此操作後,Spring將處理其餘內容並將輸入傳遞到各自的輸入引數中,收集結果並將其對映到儲存庫的指定實體類。
快速查詢實體
這裡不多說,只是簡單地展示了Person使用的類PersonRepository。
@Table(<font>"people"</font><font>) data <b>class</b> Person( @Id val id: Int? = <b>null</b>, val name: String, val age: Int ) </font>
id已被設為可為空並提供null預設值以允許Postgres自己生成下一個合適的值。如果主鍵不是可空null的並且id提供了值,則Spring實際上會嘗試在儲存時執行更新而不是插入。
該實體將對映到people下面定義的表:
CREATE TABLE people ( id SERIAL PRIMARY KEY, name VARCHAR NOT NULL, age INTEGER NOT NULL );
看看發生什麼?
現在讓我們來看看它實際上在做什麼。下面是一些程式碼,它們插入一些記錄並以幾種不同的方式檢索它們:
@SpringBootApplication <b>class</b> Application : CommandLineRunner { @Autowired <b>private</b> lateinit <b>var</b> personRepository: PersonRepository override fun run(vararg args: String?) { personRepository.saveAll( listOf( Person(name = <font>"Dan Newton"</font><font>, age = 25), Person(name = </font><font>"Laura So"</font><font>, age = 23) ) ).log().subscribe() personRepository.findAll().subscribe { log.info(</font><font>"findAll - $it"</font><font>) } personRepository.findAllById(Mono.just(1)).subscribe { log.info(</font><font>"findAllById - $it"</font><font>) } personRepository.findAllByName(</font><font>"Laura So"</font><font>).subscribe { log.info(</font><font>"findAllByName - $it"</font><font>) } personRepository.findAllByAge(25).subscribe { log.info(</font><font>"findAllByAge - $it"</font><font>) } } } </font>
這段程式碼實際上有可能沒有實際插入或讀取某些記錄,反應式應用程式意味著非同步執行操作,因此該應用程式已開始在不同的執行緒中處理函式呼叫,不阻塞主執行緒,這些非同步程序可能永遠不會完全執行。出於這個原因,在這段程式碼中應該呼叫Thread.sleep,但我從示例中刪除它們以保持一切都很整潔。
執行上面程式碼的輸出如下所示:
2019-02-11 09:04:52.294INFO 13226 --- [main] reactor.Flux.ConcatMap.1: onSubscribe(FluxConcatMap.ConcatMapImmediate) 2019-02-11 09:04:52.295INFO 13226 --- [main] reactor.Flux.ConcatMap.1: request(unbounded) 2019-02-11 09:04:52.572INFO 13226 --- [actor-tcp-nio-1] reactor.Flux.ConcatMap.1: onNext(Person(id=35, name=Dan Newton, age=25)) 2019-02-11 09:04:52.591INFO 13226 --- [actor-tcp-nio-1] reactor.Flux.ConcatMap.1: onNext(Person(id=36, name=Laura So, age=23)) 2019-02-11 09:04:52.591INFO 13226 --- [actor-tcp-nio-1] reactor.Flux.ConcatMap.1: onComplete() 2019-02-11 09:04:54.472INFO 13226 --- [actor-tcp-nio-2] com.lankydanblog.tutorial.Application: findAll - Person(id=35, name=Dan Newton, age=25) 2019-02-11 09:04:54.473INFO 13226 --- [actor-tcp-nio-2] com.lankydanblog.tutorial.Application: findAll - Person(id=36, name=Laura So, age=23) 2019-02-11 09:04:54.512INFO 13226 --- [actor-tcp-nio-4] com.lankydanblog.tutorial.Application: findAllByName - Person(id=36, name=Laura So, age=23) 2019-02-11 09:04:54.524INFO 13226 --- [actor-tcp-nio-5] com.lankydanblog.tutorial.Application: findAllByAge - Person(id=35, name=Dan Newton, age=25)
說明:
- onSubscribe和request發生在Flux呼叫它的主執行緒上。只有saveAll輸出,因為它包含了log功能。將其新增到其他呼叫中也會記錄主執行緒的相同結果。
- subscribe函式中包含的執行和它們的內部步驟Flux在不同的執行緒上執行。
這並不是真實地表示如何在實際應用程式中使用Reactive Streams,而是希望演示如何使用它們並對它們的執行方式有一些瞭解。
結論
總而言之,Reactive Streams已經出現在一些RDBMS資料庫中,這要歸功於R2DBC驅動程式和Spring Data,它們在頂層構建了一層,使一切變得更加整潔。通過使用Spring Data R2DBC,我們可以建立與資料庫的連線並開始查詢它,而無需太多程式碼。儘管Spring已經為我們做了很多事情,但它可能會做得更多。目前,它沒有Spring Boot自動配置支援。這有點煩人。但是,我確信有人會盡快做到這一點,並使一切都比現在更好。
這篇文章中使用的程式碼可以在我的GitHub上找到 。