在本文中,我们探讨了使用 Stream API 操作ResultSet的几种方法。这种方法在处理无法一次性加载到内存中的大型数据集时特别有用。此外,如果我们在应用程序中遵循函数式风格,流式存储库将与我们的逻辑很好地保持一致。
遍历ResultSet是从 JDBC 查询中检索数据的常用方法。但是,在某些情况下,我们可能更喜欢使用记录流。
在本文中,我们将探讨使用Stream API处理ResultSet 的几种方法。
1. 使用拆分器
我们将从纯 JDK 方法开始,使用Spliterators来创建流。
首先,让我们为实体定义一个模型:
public record CityRecord(String city, String country) { }
|
在我们的CityRecord中,我们存储了有关该城市及其国家的信息。
接下来,让我们创建一个与数据库交互并返回CityRecord实例流的存储库:
public class JDBCStreamAPIRepository { private static final String QUERY = "SELECT name, country FROM cities"; private final Logger logger = LoggerFactory.getLogger(JDBCStreamAPIRepository.class); public Stream<CityRecord> getCitiesStreamUsingSpliterator(Connection connection) throws SQLException { PreparedStatement preparedStatement = connection.prepareStatement(QUERY); connection.setAutoCommit(false); preparedStatement.setFetchSize(5000); ResultSet resultSet = preparedStatement.executeQuery(); return StreamSupport.stream(new Spliterators.AbstractSpliterator<CityRecord>( Long.MAX_VALUE, Spliterator.ORDERED) { @Override public boolean tryAdvance(Consumer<? super CityRecord> action) { try { if(!resultSet.next()) return false; action.accept(createCityRecord(resultSet)); return true; } catch(SQLException ex) { throw new RuntimeException(ex); } } }, false); } private CityRecord createCityRecord(ResultSet resultSet) throws SQLException { return new CityRecord(resultSet.getString(1), resultSet.getString(2)); } }
|
我们创建了一个PreparedStatement来检索cities表中的所有项目,并指定提取大小以控制内存消耗。我们使用AbstractSpliterator来生成流,只要ResultSet有更多值,就会生成新记录。此外,我们使用createCityRecord方法将每一行映射到CityRecord。
最后,让我们为我们的存储库编写一个测试:
public class JDBCResultSetWithStreamAPIUnitTest { private static Connection connection = null; private static final String JDBC_URL = "jdbc:h2:mem:testDatabase"; private static final String USERNAME = "dbUser"; private static final String PASSWORD = "dbPassword"; JDBCStreamAPIRepository jdbcStreamAPIRepository = new JDBCStreamAPIRepository(); @BeforeEach void setup() throws Exception { connection = DriverManager.getConnection(JDBC_URL, USERNAME, PASSWORD); initialDataSetup(); } private void initialDataSetup() throws SQLException { Statement statement = connection.createStatement(); String ddlQuery = "CREATE TABLE cities (name VARCHAR(50), country VARCHAR(50))"; statement.execute(ddlQuery); List<String> sqlQueryList = Arrays.asList( "INSERT INTO cities VALUES ('London', 'United Kingdom')", "INSERT INTO cities VALUES ('Sydney', 'Australia')", "INSERT INTO cities VALUES ('Bucharest', 'Romania')" ); for (String query : sqlQueryList) { statement.execute(query); } } @Test void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingSpliterator_thenExpectedRecordsShouldBeReturned() throws SQLException { Stream<CityRecord> cityRecords = jdbcStreamAPIRepository .getCitiesStreamUsingSpliterator(connection); List<CityRecord> cities = cityRecords.toList(); assertThat(cities) .containsExactly( new CityRecord("London", "United Kingdom"), new CityRecord("Sydney", "Australia"), new CityRecord("Bucharest", "Romania")); }
|
我们建立与H2 数据库的连接,并在测试之前准备包含一些条目的城市表。最后,我们验证我们的存储库是否以流的形式从表中返回所有预期项目。
2.使用 JOOQ
JOOQ是一个用于处理关系数据库的流行库。它已经提供了从ResultSet检索记录流的方法。
让我们首先添加必要的依赖项:
<dependency> <groupId>org.jooq</groupId> <artifactId>jooq</artifactId> <version>3.19.11</version> </dependency>
|
接下来,让我们向JDBCStreamAPIRepository添加一个新方法:
public Stream<CityRecord> getCitiesStreamUsingJOOQ(Connection connection) throws SQLException { PreparedStatement preparedStatement = connection.prepareStatement(QUERY); connection.setAutoCommit(false); preparedStatement.setFetchSize(5000); ResultSet resultSet = preparedStatement.executeQuery(); return DSL.using(connection) .fetchStream(resultSet) .map(r -> new CityRecord(r.get("NAME", String.class), r.get("COUNTRY", String.class)))]; }
|
我们使用ResultQuery类中的fetchStream()方法从ResultSet构建记录流。此外,我们将 JOOQ 记录映射到CityRecord实例,然后再从方法返回它们。
让我们调用新方法并验证其是否正确运行:
@Test void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJOOQ_thenExpectedRecordsShouldBeReturned() throws SQLException { Stream<CityRecord> cityRecords = jdbcStreamAPIRepository .getCitiesStreamUsingJOOQ(connection); List<CityRecord> cities = cityRecords.toList(); assertThat(cities) .containsExactly( new CityRecord("London", "United Kingdom"), new CityRecord("Sydney", "Australia"), new CityRecord("Bucharest", "Romania")); }
|
正如预期的那样,我们从流中的数据库中检索了所有城市记录。
3.使用jdbc-stream
或者,我们可以使用名为jdbc-stream 的轻量级库从ResultSet创建流。
让我们添加它的依赖项:
<dependency> <groupId>com.github.juliomarcopineda</groupId> <artifactId>jdbc-stream</artifactId> <version>0.1.1</version> </dependency>
|
现在,让我们向JDBCStreamAPIRepository添加一个新方法:
public Stream<CityRecord> getCitiesStreamUsingJdbcStream(Connection connection) throws SQLException { PreparedStatement preparedStatement = connection.prepareStatement(QUERY); connection.setAutoCommit(false); preparedStatement.setFetchSize(5000); ResultSet resultSet = preparedStatement.executeQuery(); return JdbcStream.stream(resultSet) .map(r -> { try { return createCityRecord(resultSet); } catch (SQLException e) { throw new RuntimeException(e); } }); }
|
我们使用JdbcStream从ResultSet构建流。在底层,它使用 Spliterators 并使用与我们自己的实现相同的逻辑构建流。
现在,我们将检查新的存储库方法如何工作:
@Test void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJdbcStream_thenExpectedRecordsShouldBeReturned() throws SQLException { Stream<CityRecord> cityRecords = jdbcStreamAPIRepository .getCitiesStreamUsingJdbcStream(connection); List<CityRecord> cities = cityRecords.toList(); assertThat(cities) .containsExactly( new CityRecord("London", "United Kingdom"), new CityRecord("Sydney", "Australia"), new CityRecord("Bucharest", "Romania")); }
|
我们已经使用 jdbc-stream 库获得了所有预期的项目。
关闭资源
使用 JDBC 时,我们必须关闭所有使用的资源以避免连接泄漏。常见的做法是在Connection、PreparedStatement和ResultSet周围使用try-with-resources语法。但是,在使用流时,这种方法并不合适。如果我们从存储库方法返回一个流,则所有资源都将关闭,并且流上的任何操作都将无法访问它们。
为了避免这个问题,我们需要使用 流的onClose()方法关闭所有资源。此外,我们必须确保在使用完流后关闭该流。
让我们修改我们的存储库方法以包含资源关闭逻辑:
public Stream<CityRecord> getCitiesStreamUsingJdbcStream(Connection connection) throws SQLException { PreparedStatement preparedStatement = connection.prepareStatement(QUERY); connection.setAutoCommit(false); preparedStatement.setFetchSize(5000); ResultSet resultSet = preparedStatement.executeQuery(); return JdbcStream.stream(resultSet) .map(r -> { try { return createCityRecord(resultSet); } catch (SQLException e) { throw new RuntimeException(e); } }) .onClose(() -> closeResources(connection, resultSet, preparedStatement)); } private void closeResources(Connection connection, ResultSet resultSet, PreparedStatement preparedStatement) { try { resultSet.close(); preparedStatement.close(); connection.close(); logger.info("Resources closed"); } catch (SQLException e) { throw new RuntimeException(e); } }
|
我们添加了closeResources()方法并将其附加到onClose()流处理程序。
现在,让我们修改客户端代码以确保使用后关闭流:
@Test void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJdbcStream_thenExpectedRecordsShouldBeReturned() throws SQLException { Stream<CityRecord> cityRecords = jdbcStreamAPIRepository .getCitiesStreamUsingJdbcStream(connection); List<CityRecord> cities = cityRecords.toList(); cityRecords.close(); assertThat(cities) .containsExactly( new CityRecord("London", "United Kingdom"), new CityRecord("Sydney", "Australia"), new CityRecord("Bucharest", "Romania")); }
|
在这里,我们在处理完所有项目后关闭流。此外,我们可以观察到一条日志消息,表明所有资源都已关闭:
[main] INFO com.baeldung.resultset.streams.JDBCStreamAPIRepository -- Resources closed