使用Spring Data R2DBC进行异步RDBMS访问 - Lanky Dan Dev Blog

19-02-17 banq
                   

不久前,JDBC驱动程序的反应变体称为R2DBC发布了,它允许数据异步流式传输到已订阅它的任何端点,结合使用像R2DBC这样的反应式驱动程序和Spring WebFlux,可以编写一个完整的响应式应用程序来异步进行数据的接收和发送。在这篇文章中,我们将重点关注数据库端:从连接到数据库,然后最终保存和检索数据。

我们使用Spring Data实现数据库端的反应式应用,与所有Spring Data模块一样,它为我们提供了开箱即用的配置,可以减少我们需要编写的样板代码量以获得我们的应用程序设置。最重要的是,它在数据库驱动程序上提供了一个层,更容易编制任务变得更容易。

对于这篇文章的内容,我使用的是Postgres数据库,当然,H2和Microsoft SQL Server都有自己的R2DBC驱动程序实现。

依赖

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-r2dbc</artifactId>
    <version>1.0.0.M1</version>
  </dependency>
  <dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-postgresql</artifactId>
    <version>1.0.0.M6</version>
  </dependency>
  <dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
  </dependency>
</dependencies>

<repositories>
  <repository>
    <id>repository.spring.milestone</id>
    <name>Spring Milestone Repository</name>
    <url>http://repo.spring.io/milestone</url>
  </repository>
</repositories>

使用Spring Boot的次数越多,就越习惯于使用spring-boot-starter导入单个依赖项。我希望会有spring-boot-starter-r2dbc依赖,但不幸的是,没有这样一个依赖。在编写本文时,它没有自己的Spring Boot模块,我相信未来会让R2DBC驱动程序更容易设置。目前,我们需要手动填写一些额外的依赖项。此外,R2DBC库只有Milestone版本(更多证明它们是新的)所以我们需要确保引入Spring Milestone库。

连接到数据库

感谢Spring Data为我们做了很多工作,需要手动创建的唯一Bean ConnectionFactory包含数据库的连接细节:

@Configuration
@EnableR2dbcRepositories
class DatabaseConfiguration(
  @Value("\${spring.data.postgres.host}") private val host: String,
  @Value("\${spring.data.postgres.port}") private val port: Int,
  @Value("\${spring.data.postgres.database}") private val database: String,
  @Value("\${spring.data.postgres.username}") private val username: String,
  @Value("\${spring.data.postgres.password}") private val password: String
) : AbstractR2dbcConfiguration() {

  override fun connectionFactory(): ConnectionFactory {
    return PostgresqlConnectionFactory(
      PostgresqlConnectionConfiguration.builder()
        .host(host)
        .port(port)
        .database(database)
        .username(username)
        .password(password).build()
    )
  }
}

这里要注意的第一件事是扩展AbstractR2dbcConfiguration。该类包含一堆我们不再需要手动创建的Bean。实现connectionFactory是类的唯一要求,因为创建DatabaseClientBean 需要它。

这种结构是Spring Data模块的典型结构,因此在尝试不同的模块时会感觉非常熟悉。此外,我希望一旦自动配置可用,就可以删除这个手动配置,并且只能通过自动配置application.properties驱动。

Spring可以连接到正在运行的Postgres实例的配置:

Postgres的port属性默认值5432 ;host,database,username和password是PostgresqlConnectionFactory需要的定义,缺少一个会抛出异常。

这个例子的最后一条值得注意的信息是使用@EnableR2dbcRepositories。此注释指示Spring查找扩展Spring Repository接口的任何存储库接口。这用作检测Spring Data存储库的基础接口。我们将在下一节中进一步了解这一点。要从中获取的主要信息是您需要使用@EnableR2dbcRepositories注释来充分利用Spring Data的功能。

创建Spring Data Repository

如上所述,在本节中,我们将介绍添加Spring Data Repository。这些存储库是Spring Data的一个很好的特性,这意味着您不需要编写大量额外代码来编写查询。不幸的是,至少就目前而言,Spring R2DBC不能像其他Spring Data模块那样进行推断查询(我相信这会在某些时候添加)。这意味着您需要使用@Query注释并手动编写SQL。让我们来看看:

@Repository
interface PersonRepository : R2dbcRepository<Person, Int> {

  @Query("SELECT * FROM people WHERE name = $1")
  fun findAllByName(name: String): Flux<Person>

  @Query("SELECT * FROM people WHERE age = $1")
  fun findAllByAge(age: Int): Flux<Person>
}

此接口扩展R2dbcRepository。又扩展了ReactiveCrudRepository,ReactiveCrudRepository提供标准的CRUD功能,据我所知,R2dbcRepository它不提供任何额外的功能,而是为更好的上下文命名而创建的接口。

R2dbcRepository接受两个通用参数,一个是作为输入并作为输出生成的实体类。第二个是主键的类型。因此,在这种情况下,Person类由PersonRepository(有意义)管理,内部的主键字段Person是Int。

在这个类中函数的返回类型是ReactiveCrudRepository提供的Flux和Mono,这些是Spring使用的Project Reactor类型,作为默认的Reactive Stream类型。Flux表示多个元素的流,而 Mono表示单个结果。

最后,正如我之前在示例中提到的,每个函数都使用注释@Query。语法非常简单,SQL是注释中的一个字符串。$1($2,$3等...更多输入)表示输入到函数的值。完成此操作后,Spring将处理其余内容并将输入传递到各自的输入参数中,收集结果并将其映射到存储库的指定实体类。

快速查询实体

这里不多说,只是简单地展示了Person使用的类PersonRepository。

@Table("people")
data class Person(
  @Id val id: Int? = null,
  val name: String,
  val age: Int
)

id已被设为可为空并提供null默认值以允许Postgres自己生成下一个合适的值。如果主键不是可空null的并且id提供了值,则Spring实际上会尝试在保存时运行更新而不是插入。

该实体将映射到people下面定义的表:

CREATE TABLE people (
  id SERIAL PRIMARY KEY, 
  name VARCHAR NOT NULL, 
  age INTEGER NOT NULL
);

看看发生什么?

现在让我们来看看它实际上在做什么。下面是一些代码,它们插入一些记录并以几种不同的方式检索它们:

@SpringBootApplication
class Application : CommandLineRunner {

  @Autowired
  private lateinit var personRepository: PersonRepository

  override fun run(vararg args: String?) {
    personRepository.saveAll(
      listOf(
        Person(name = "Dan Newton", age = 25),
        Person(name = "Laura So", age = 23)
      )
    ).log().subscribe()
    personRepository.findAll().subscribe { log.info("findAll - $it") }
    personRepository.findAllById(Mono.just(1)).subscribe { log.info("findAllById - $it") }
    personRepository.findAllByName("Laura So").subscribe { log.info("findAllByName - $it") }
    personRepository.findAllByAge(25).subscribe { log.info("findAllByAge - $it") }
  }
}

这段代码实际上有可能没有实际插入或读取某些记录,反应式应用程序意味着异步执行操作,因此该应用程序已开始在不同的线程中处理函数调用,不阻塞主线程,这些异步进程可能永远不会完全执行。出于这个原因,在这段代码中应该调用Thread.sleep,但我从示例中删除它们以保持一切都很整洁。

运行上面代码的输出如下所示:

2019-02-11 09:04:52.294  INFO 13226 --- [           main] reactor.Flux.ConcatMap.1                 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2019-02-11 09:04:52.295  INFO 13226 --- [           main] reactor.Flux.ConcatMap.1                 : request(unbounded)
2019-02-11 09:04:52.572  INFO 13226 --- [actor-tcp-nio-1] reactor.Flux.ConcatMap.1                 : onNext(Person(id=35, name=Dan Newton, age=25))
2019-02-11 09:04:52.591  INFO 13226 --- [actor-tcp-nio-1] reactor.Flux.ConcatMap.1                 : onNext(Person(id=36, name=Laura So, age=23))
2019-02-11 09:04:52.591  INFO 13226 --- [actor-tcp-nio-1] reactor.Flux.ConcatMap.1                 : onComplete()
2019-02-11 09:04:54.472  INFO 13226 --- [actor-tcp-nio-2] com.lankydanblog.tutorial.Application    : findAll - Person(id=35, name=Dan Newton, age=25)
2019-02-11 09:04:54.473  INFO 13226 --- [actor-tcp-nio-2] com.lankydanblog.tutorial.Application    : findAll - Person(id=36, name=Laura So, age=23)
2019-02-11 09:04:54.512  INFO 13226 --- [actor-tcp-nio-4] com.lankydanblog.tutorial.Application    : findAllByName - Person(id=36, name=Laura So, age=23)
2019-02-11 09:04:54.524  INFO 13226 --- [actor-tcp-nio-5] com.lankydanblog.tutorial.Application    : findAllByAge - Person(id=35, name=Dan Newton, age=25)

说明:

  • onSubscribe和request发生在Flux调用它的主线程上。只有saveAll输出,因为它包含了log功能。将其添加到其他调用中也会记录主线程的相同结果。
  • subscribe函数中包含的执行和它们的内部步骤Flux在不同的线程上运行。

这并不是真实地表示如何在实际应用程序中使用Reactive Streams,而是希望演示如何使用它们并对它们的执行方式有一些了解。

结论

总而言之,Reactive Streams已经出现在一些RDBMS数据库中,这要归功于R2DBC驱动程序和Spring Data,它们在顶层构建了一层,使一切变得更加整洁。通过使用Spring Data R2DBC,我们可以创建与数据库的连接并开始查询它,而无需太多代码。尽管Spring已经为我们做了很多事情,但它可能会做得更多。目前,它没有Spring Boot自动配置支持。这有点烦人。但是,我确信有人会尽快做到这一点,并使一切都比现在更好。

这篇文章中使用的代码可以在我的GitHub上找到。​​​​​​​

                   

1