R2DBC,Spring Data JDBC和WebFlux案例介绍

18-10-19 banq
              

本文有关Spring响应式编程最新技术示例。

Spring WebFlux已经在Spring 5和Spring Boot 2中引入,Spring 5还引入了支持NoSQL数据库如Cassandra,MongoDB或Couchbase反应式访问的库包。通过R2DBC实现访问关系数据库的反应性支持。

值得一提的是关于Spring Data JDBC的一些话。该项目已经发布,可在1.0版本下使用。它是更大的Spring Data框架的一部分。它提供了基于JDBC的存储库抽象。创建该库的主要原因是允许使用Spring Data方式访问关系数据库(通过CrudRepository接口)不包括JPA库到应用程序依赖项。当然,JPA仍然是用于Java应用程序的主要持久性API。Spring Data JDBC的目的是通过不实现延迟加载,缓存,脏上下文,会话等流行模式,在概念上比JPA简单得多。它还仅对基于注释的映射提供非常有限的支持。最后,它提供了使用R2DBC访问关系数据库的反应式存储库的实现。虽然该模块仍在开发中(只有SNAPSHOT版本可用),但我们将尝试在我们的演示应用程序中使用它。让我们继续实施。

包括依赖项

我们使用Kotlin来实现。首先,我们包含一些必需的Kotlin依赖项。

<dependency>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-stdlib</artifactId>
  <version>${kotlin.version}</version>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.module</groupId>
  <artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-test-junit</artifactId>
  <version>${kotlin.version}</version>
  <scope>test</scope>
</dependency>

我们还应该添加kotlin-maven-plugin对Spring的支持。

<plugin>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-maven-plugin</artifactId>
  <version>${kotlin.version}</version>
  <executions>
    <execution>
      <id>compile</id>
      <phase>compile</phase>
      <goals>
        <goal>compile</goal>
      </goals>
    </execution>
    <execution>
      <id>test-compile</id>
      <phase>test-compile</phase>
      <goals>
        <goal>test-compile</goal>
      </goals>
    </execution>
  </executions>
  <configuration>
    <args>
      <arg>-Xjsr305=strict</arg>
    </args>
    <compilerPlugins>
      <plugin>spring</plugin>
    </compilerPlugins>
  </configuration>
</plugin>

然后,我们可以继续包括演示实现所需的框架。我们需要包含专用于使用R2DBC访问数据库的特殊SNAPSHOT版本的Spring Data JDBC。我们还必须添加一些R2DBC库和Spring WebFlux。正如您在下面看到的,只有Spring WebFlux可用于稳定版本(作为Spring Boot RELEASE的一部分)。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.data</groupId>
  <artifactId>spring-data-jdbc</artifactId>
  <version>1.0.0.r2dbc-SNAPSHOT</version>
</dependency>
<dependency>
  <groupId>io.r2dbc</groupId>
  <artifactId>r2dbc-spi</artifactId>
  <version>1.0.0.M5</version>
</dependency>
<dependency>
  <groupId>io.r2dbc</groupId>
  <artifactId>r2dbc-postgresql</artifactId>
  <version>1.0.0.M5</version>
</dependency>
为Spring Data项目设置依赖项管理也很重要。

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.data</groupId>
      <artifactId>spring-data-releasetrain</artifactId>
      <version>Lovelace-RELEASE</version>
      <scope>import</scope>
      <type>pom</type>
    </dependency>
  </dependencies>
</dependencyManagement>

我们使用著名名的Spring Data风格的CRUD存储库实现。在这种情况下,我们需要创建扩展ReactiveCrudRepository接口的接口。

这是用于管理Employee对象的存储库的实现。

interface EmployeeRepository : ReactiveCrudRepository<Employee, Int< {
    @Query("select id, name, salary, organization_id from employee e where e.organization_id = $1")
    fun findByOrganizationId(organizationId: Int) : Flux<Employee>
}

这是存储库的另一个实现 - 这次是管理Organization对象。

interface OrganizationRepository : ReactiveCrudRepository<Organization, Int< {
}

实体和DTO

Kotlin通过将实体类声明为数据类来提供创建实体类的便捷方法。使用Spring Data JDBC时,我们必须通过使用注释字段来为实体设置主键@Id。它假定密钥由数据库自动递增。如果未使用自动增量列,则必须使用BeforeSaveEvent侦听器,该侦听器设置实体的ID。但是,我试图为我的实体设置这样的监听器,但它只是不能与Spring Data JDBC的反应式版本一起使用。

这是Employee实体类的实现。值得一提的是Spring Data JDBC会自动将类字段映射organizationId到数据库列organization_id。

data class Employee(val name: String, val salary: Int, val organizationId: Int) {
    @Id 
    var id: Int? = null
}

这是Organization实体类的实现。

data class Organization(var name: String) {
    @Id 
    var id: Int? = null
}

R2DBC不支持任何列表或集合。因为我想Organization在一个API端点中返回带有员工内部对象的列表,所以我创建了包含如下所示列表的DTO。

data class OrganizationDTO(var id: Int?, var name: String) {
    var employees : MutableList = ArrayList()
    constructor(employees: MutableList) : this(null, "") {
        this.employees = employees
    }
}

与创建的实体对应的SQL脚本在下面可见。字段类型serial将自动创建序列并将其附加到字段id。

CREATE TABLE employee (
    name character varying NOT NULL,
    salary integer NOT NULL,
    id serial PRIMARY KEY,
    organization_id integer
);
CREATE TABLE organization (
    name character varying NOT NULL,
    id serial PRIMARY KEY
);

构建示例Web应用程序

为了演示目的,我们将构建两个独立的应用程序employee-service和organization-service。应用程序organization-service正在employee-service使用WebFlux进行通信WebClient。它获取分配给组织的员工列表,并包括它们与Organization对象一起响应。示例应用程序源代码可在GitHub上的存储库sample-spring-data-webflux下获得:https://github.com/piomin/sample-spring-data-webflux。

好的,让我们从声明Spring Boot主类开始吧。我们需要通过使用注释主类来启用Spring Data JDBC存储库@EnableJdbcRepositories。

@SpringBootApplication
@EnableJdbcRepositories
class EmployeeApplication

fun main(args: Array<String>) {
    runApplication<EmployeeApplication>(*args)
}

使用R2DBC和Postgres需要一些配置。可能由于Spring Data JDBC和R2DBC开发的早期阶段,Postgres没有Spring Boot自动配置。我们需要在@Configurationbean中声明连接工厂,客户端和存储库。

@Configuration
class EmployeeConfiguration {

    @Bean
    fun repository(factory: R2dbcRepositoryFactory): EmployeeRepository {
        return factory.getRepository(EmployeeRepository::class.java)
    }

    @Bean
    fun factory(client: DatabaseClient): R2dbcRepositoryFactory {
        val context = RelationalMappingContext()
        context.afterPropertiesSet()
        return R2dbcRepositoryFactory(client, context)
    }

    @Bean
    fun databaseClient(factory: ConnectionFactory): DatabaseClient {
        return DatabaseClient.builder().connectionFactory(factory).build()
    }

    @Bean
    fun connectionFactory(): PostgresqlConnectionFactory {
        val config = PostgresqlConnectionConfiguration.builder() //
                .host("192.168.99.100") //
                .port(5432) //
                .database("reactive") //
                .username("reactive") //
                .password("reactive123") //
                .build()

        return PostgresqlConnectionFactory(config)
    }

}

最后,我们可以创建包含我们的反应API方法定义的REST控制器。使用Kotlin它不会占用太多空间。以下控制器定义包含三种GET方法,这些方法允许按ID查找所有员工,分配给给定组织的所有员工或单个员工。

@RestController
@RequestMapping("/employees")
class EmployeeController {

    @Autowired
    lateinit var repository : EmployeeRepository

    @GetMapping
    fun findAll() : Flux<Employee> = repository.findAll()

    @GetMapping("/{id}")
    fun findById(@PathVariable id : Int) : Mono<Employee> = repository.findById(id)

    @GetMapping("/organization/{organizationId}")
    fun findByorganizationId(@PathVariable organizationId : Int) : Flux<Employee> = repository.findByOrganizationId(organizationId)

    @PostMapping
    fun add(@RequestBody employee: Employee) : Mono<Employee> = repository.save(employee)

}

服务间通信

因为OrganizationController实现有点复杂。因为organization-service正在与之通信employee-service,我们首先需要声明反应式WebFlux WebClient构建器。

@Bean
fun clientBuilder() : WebClient.Builder {
  return WebClient.builder()
}

然后,类似于存储库bean,构建器被注入控制器。它用于内部findByIdWithEmployees方法调用方法GET /employees/organization/{organizationId}通过暴露employee-service。正如您在下面的代码片段中看到的那样,它提供了响应式API和Flux包含已找到员工列表的返回对象。OrganizationDTO使用zipWithReactor方法将此列表注入对象。

@RestController
@RequestMapping("/organizations")
class OrganizationController {

    @Autowired
    lateinit var repository : OrganizationRepository
    @Autowired
    lateinit var clientBuilder : WebClient.Builder

    @GetMapping
    fun findAll() : Flux<Organization> = repository.findAll()

    @GetMapping("/{id}")
    fun findById(@PathVariable id : Int) : Mono<Organization> = repository.findById(id)

    @GetMapping("/{id}/withEmployees")
    fun findByIdWithEmployees(@PathVariable id : Int) : Mono<OrganizationDTO> {
        val employees : Flux<Employee> = clientBuilder.build().get().uri("http://localhost:8090/employees/organization/$id")
                .retrieve().bodyToFlux(Employee::class.java)
        val org : Mono = repository.findById(id)
        return org.zipWith(employees.collectList())
                .map { tuple -> OrganizationDTO(tuple.t1.id as Int, tuple.t1.name, tuple.t2) }
    }

    @PostMapping
    fun add(@RequestBody employee: Organization) : Mono<Organization> = repository.save(employee)

}

这个怎么运行?

在运行测试之前,我们需要启动Postgres数据库。这是用于运行Postgres容器的Docker命令。它正在创建具有密码的用户,并设置默认数据库。

$ docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=reactive -e POSTGRES_PASSWORD=reactive123 -e POSTGRES_DB=reactive postgres

然后我们需要创建一些测试表,因此您必须运行放置在实现实体和DTO部分中的SQL脚本。之后,您可以开始我们的测试应用程序。如果不覆盖application.yml文件中提供的默认设置,employee-service则在端口8090和organization-service端口8095上进行侦听。下图说明了我们的示例系统的体系结构。

弹簧数据-1

现在,让我们使用应用程序公开的反应API添加一些测试数据。

$ curl -d '{"name":"Test1"}' -H "Content-Type: application/json" -X POST http://localhost:8095/organizations

$ curl -d '{"name":"Name1", "balance":5000, "organizationId":1}' -H "Content-Type: application/json" -X POST http://localhost:8090/employees

$ curl -d '{"name":"Name2", "balance":10000, "organizationId":1}' -H "Content-Type: application/json" -X POST http://localhost:8090/employees

最后,您可以调用GET organizations/{id}/withEmployees方法。

至此,响应式reactive API构成了,github