在本文中,我们探讨了使用 Stream API 操作ResultSet的几种方法。这种方法在处理无法一次性加载到内存中的大型数据集时特别有用。此外,如果我们在应用程序中遵循函数式风格,流式存储库将与我们的逻辑很好地保持一致。
遍历ResultSet是从 JDBC 查询中检索数据的常用方法。但是,在某些情况下,我们可能更喜欢使用记录流。
在本文中,我们将探讨使用Stream API处理ResultSet 的几种方法。
1. 使用拆分器
我们将从纯 JDK 方法开始,使用Spliterators来创建流。
首先,让我们为实体定义一个模型:
| public record CityRecord(String city, String country) {}
 
 | 
在我们的CityRecord中,我们存储了有关该城市及其国家的信息。
接下来,让我们创建一个与数据库交互并返回CityRecord实例流的存储库:
| public class JDBCStreamAPIRepository {private static final String QUERY = "SELECT name, country FROM cities";
 private final Logger logger = LoggerFactory.getLogger(JDBCStreamAPIRepository.class);
 public Stream<CityRecord> getCitiesStreamUsingSpliterator(Connection connection)
 throws SQLException {
 PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
 connection.setAutoCommit(false);
 preparedStatement.setFetchSize(5000);
 ResultSet resultSet = preparedStatement.executeQuery();
 return StreamSupport.stream(new Spliterators.AbstractSpliterator<CityRecord>(
 Long.MAX_VALUE, Spliterator.ORDERED) {
 @Override
 public boolean tryAdvance(Consumer<? super CityRecord> action) {
 try {
 if(!resultSet.next()) return false;
 action.accept(createCityRecord(resultSet));
 return true;
 } catch(SQLException ex) {
 throw new RuntimeException(ex);
 }
 }
 }, false);
 }
 private CityRecord createCityRecord(ResultSet resultSet) throws SQLException {
 return new CityRecord(resultSet.getString(1), resultSet.getString(2));
 }
 }
 
 | 
我们创建了一个PreparedStatement来检索cities表中的所有项目,并指定提取大小以控制内存消耗。我们使用AbstractSpliterator来生成流,只要ResultSet有更多值,就会生成新记录。此外,我们使用createCityRecord方法将每一行映射到CityRecord。
最后,让我们为我们的存储库编写一个测试:
| public class JDBCResultSetWithStreamAPIUnitTest {private static Connection connection = null;
 private static final String JDBC_URL = "jdbc:h2:mem:testDatabase";
 private static final String USERNAME = "dbUser";
 private static final String PASSWORD = "dbPassword";
 JDBCStreamAPIRepository jdbcStreamAPIRepository = new JDBCStreamAPIRepository();
 @BeforeEach
 void setup() throws Exception {
 connection = DriverManager.getConnection(JDBC_URL, USERNAME, PASSWORD);
 initialDataSetup();
 }
 private void initialDataSetup() throws SQLException {
 Statement statement = connection.createStatement();
 String ddlQuery = "CREATE TABLE cities (name VARCHAR(50), country VARCHAR(50))";
 statement.execute(ddlQuery);
 List<String> sqlQueryList = Arrays.asList(
 "INSERT INTO cities VALUES ('London', 'United Kingdom')",
 "INSERT INTO cities VALUES ('Sydney', 'Australia')",
 "INSERT INTO cities VALUES ('Bucharest', 'Romania')"
 );
 for (String query : sqlQueryList) {
 statement.execute(query);
 }
 }
 @Test
 void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingSpliterator_thenExpectedRecordsShouldBeReturned() throws SQLException {
 Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
 .getCitiesStreamUsingSpliterator(connection);
 List<CityRecord> cities = cityRecords.toList();
 assertThat(cities)
 .containsExactly(
 new CityRecord("London", "United Kingdom"),
 new CityRecord("Sydney", "Australia"),
 new CityRecord("Bucharest", "Romania"));
 }
 
 | 
我们建立与H2 数据库的连接,并在测试之前准备包含一些条目的城市表。最后,我们验证我们的存储库是否以流的形式从表中返回所有预期项目。
2.使用 JOOQ
JOOQ是一个用于处理关系数据库的流行库。它已经提供了从ResultSet检索记录流的方法。
让我们首先添加必要的依赖项:
| <dependency><groupId>org.jooq</groupId>
 <artifactId>jooq</artifactId>
 <version>3.19.11</version>
 </dependency>
 
 | 
接下来,让我们向JDBCStreamAPIRepository添加一个新方法:
| public Stream<CityRecord> getCitiesStreamUsingJOOQ(Connection connection)throws SQLException {
 PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
 connection.setAutoCommit(false);
 preparedStatement.setFetchSize(5000);
 ResultSet resultSet = preparedStatement.executeQuery();
 return DSL.using(connection)
 .fetchStream(resultSet)
 .map(r -> new CityRecord(r.get("NAME", String.class),
 r.get("COUNTRY", String.class)))];
 }
 
 | 
我们使用ResultQuery类中的fetchStream()方法从ResultSet构建记录流。此外,我们将 JOOQ 记录映射到CityRecord实例,然后再从方法返回它们。
让我们调用新方法并验证其是否正确运行:
| @Testvoid givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJOOQ_thenExpectedRecordsShouldBeReturned() throws SQLException {
 Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
 .getCitiesStreamUsingJOOQ(connection);
 List<CityRecord> cities = cityRecords.toList();
 assertThat(cities)
 .containsExactly(
 new CityRecord("London", "United Kingdom"),
 new CityRecord("Sydney", "Australia"),
 new CityRecord("Bucharest", "Romania"));
 }
 
 | 
正如预期的那样,我们从流中的数据库中检索了所有城市记录。
3.使用jdbc-stream
或者,我们可以使用名为jdbc-stream 的轻量级库从ResultSet创建流。
让我们添加它的依赖项:
| <dependency><groupId>com.github.juliomarcopineda</groupId>
 <artifactId>jdbc-stream</artifactId>
 <version>0.1.1</version>
 </dependency>
 
 | 
现在,让我们向JDBCStreamAPIRepository添加一个新方法:
| public Stream<CityRecord> getCitiesStreamUsingJdbcStream(Connection connection)throws SQLException {
 PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
 connection.setAutoCommit(false);
 preparedStatement.setFetchSize(5000);
 ResultSet resultSet = preparedStatement.executeQuery();
 return JdbcStream.stream(resultSet)
 .map(r -> {
 try {
 return createCityRecord(resultSet);
 } catch (SQLException e) {
 throw new RuntimeException(e);
 }
 });
 }
 
 | 
我们使用JdbcStream从ResultSet构建流。在底层,它使用 Spliterators 并使用与我们自己的实现相同的逻辑构建流。
现在,我们将检查新的存储库方法如何工作:
| @Testvoid givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJdbcStream_thenExpectedRecordsShouldBeReturned() throws SQLException {
 Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
 .getCitiesStreamUsingJdbcStream(connection);
 List<CityRecord> cities = cityRecords.toList();
 assertThat(cities)
 .containsExactly(
 new CityRecord("London", "United Kingdom"),
 new CityRecord("Sydney", "Australia"),
 new CityRecord("Bucharest", "Romania"));
 }
 
 | 
我们已经使用 jdbc-stream 库获得了所有预期的项目。
关闭资源
使用 JDBC 时,我们必须关闭所有使用的资源以避免连接泄漏。常见的做法是在Connection、PreparedStatement和ResultSet周围使用try-with-resources语法。但是,在使用流时,这种方法并不合适。如果我们从存储库方法返回一个流,则所有资源都将关闭,并且流上的任何操作都将无法访问它们。
为了避免这个问题,我们需要使用 流的onClose()方法关闭所有资源。此外,我们必须确保在使用完流后关闭该流。
让我们修改我们的存储库方法以包含资源关闭逻辑:
| public Stream<CityRecord> getCitiesStreamUsingJdbcStream(Connection connection)throws SQLException {
 PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
 connection.setAutoCommit(false);
 preparedStatement.setFetchSize(5000);
 ResultSet resultSet = preparedStatement.executeQuery();
 return JdbcStream.stream(resultSet)
 .map(r -> {
 try {
 return createCityRecord(resultSet);
 } catch (SQLException e) {
 throw new RuntimeException(e);
 }
 })
 .onClose(() -> closeResources(connection, resultSet, preparedStatement));
 }
 private void closeResources(Connection connection, ResultSet resultSet, PreparedStatement preparedStatement) {
 try {
 resultSet.close();
 preparedStatement.close();
 connection.close();
 logger.info("Resources closed");
 } catch (SQLException e) {
 throw new RuntimeException(e);
 }
 }
 
 | 
我们添加了closeResources()方法并将其附加到onClose()流处理程序。
现在,让我们修改客户端代码以确保使用后关闭流:
| @Testvoid givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJdbcStream_thenExpectedRecordsShouldBeReturned() throws SQLException {
 Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
 .getCitiesStreamUsingJdbcStream(connection);
 List<CityRecord> cities = cityRecords.toList();
 cityRecords.close();
 assertThat(cities)
 .containsExactly(
 new CityRecord("London", "United Kingdom"),
 new CityRecord("Sydney", "Australia"),
 new CityRecord("Bucharest", "Romania"));
 }
 
 | 
在这里,我们在处理完所有项目后关闭流。此外,我们可以观察到一条日志消息,表明所有资源都已关闭:
[main] INFO com.baeldung.resultset.streams.JDBCStreamAPIRepository -- Resources closed