使用Stream API处理JDBC结果集3种方法

在本文中,我们探讨了使用 Stream API 操作ResultSet的几种方法。这种方法在处理无法一次性加载到内存中的大型数据集时特别有用。此外,如果我们在应用程序中遵循函数式风格,流式存储库将与我们的逻辑很好地保持一致。

遍历ResultSet是从 JDBC 查询中检索数据的常用方法。但是,在某些情况下,我们可能更喜欢使用记录流。

在本文中,我们将探讨使用Stream API处理ResultSet 的几种方法。

1. 使用拆分器
我们将从纯 JDK 方法开始,使用Spliterators来创建流。

首先,让我们为实体定义一个模型:

public record CityRecord(String city, String country) {
}

在我们的CityRecord中,我们存储了有关该城市及其国家的信息。

接下来,让我们创建一个与数据库交互并返回CityRecord实例流的存储库:

public class JDBCStreamAPIRepository {
    private static final String QUERY = "SELECT name, country FROM cities";
    private final Logger logger = LoggerFactory.getLogger(JDBCStreamAPIRepository.class);
    public Stream<CityRecord> getCitiesStreamUsingSpliterator(Connection connection)
            throws SQLException {
        PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
        connection.setAutoCommit(false);
        preparedStatement.setFetchSize(5000);
        ResultSet resultSet = preparedStatement.executeQuery();
        return StreamSupport.stream(new Spliterators.AbstractSpliterator<CityRecord>(
          Long.MAX_VALUE, Spliterator.ORDERED) {
            @Override
            public boolean tryAdvance(Consumer<? super CityRecord> action) {
                try {
                    if(!resultSet.next()) return false;
                    action.accept(createCityRecord(resultSet));
                    return true;
                } catch(SQLException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }, false);
    }
    private CityRecord createCityRecord(ResultSet resultSet) throws SQLException {
        return new CityRecord(resultSet.getString(1), resultSet.getString(2));
    }
}

我们创建了一个PreparedStatement来检索cities表中的所有项目,并指定提取大小以控制内存消耗。我们使用AbstractSpliterator来生成流,只要ResultSet有更多值,就会生成新记录。此外,我们使用createCityRecord方法将每一行映射到CityRecord。

最后,让我们为我们的存储库编写一个测试:

public class JDBCResultSetWithStreamAPIUnitTest {
    private static Connection connection = null;
    private static final String JDBC_URL = "jdbc:h2:mem:testDatabase";
    private static final String USERNAME = "dbUser";
    private static final String PASSWORD = "dbPassword";
    JDBCStreamAPIRepository jdbcStreamAPIRepository = new JDBCStreamAPIRepository();
    @BeforeEach
    void setup() throws Exception {
        connection = DriverManager.getConnection(JDBC_URL, USERNAME, PASSWORD);
        initialDataSetup();
    }
    private void initialDataSetup() throws SQLException {
        Statement statement = connection.createStatement();
        String ddlQuery = "CREATE TABLE cities (name VARCHAR(50), country VARCHAR(50))";
        statement.execute(ddlQuery);
        List<String> sqlQueryList = Arrays.asList(
          "INSERT INTO cities VALUES ('London', 'United Kingdom')",
          "INSERT INTO cities VALUES ('Sydney', 'Australia')",
          "INSERT INTO cities VALUES ('Bucharest', 'Romania')"
        );
        for (String query : sqlQueryList) {
            statement.execute(query);
        }
    }
    @Test
    void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingSpliterator_thenExpectedRecordsShouldBeReturned() throws SQLException {
        Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
          .getCitiesStreamUsingSpliterator(connection);
        List<CityRecord> cities = cityRecords.toList();
        assertThat(cities)
          .containsExactly(
            new CityRecord("London", "United Kingdom"),
            new CityRecord("Sydney", "Australia"),
            new CityRecord("Bucharest", "Romania"));
    }

我们建立与H2 数据库的连接,并在测试之前准备包含一些条目的城市表。最后,我们验证我们的存储库是否以流的形式从表中返回所有预期项目。

2.使用 JOOQ
JOOQ是一个用于处理关系数据库的流行库。它已经提供了从ResultSet检索记录流的方法。

让我们首先添加必要的依赖项:

<dependency>
    <groupId>org.jooq</groupId>
    <artifactId>jooq</artifactId>
    <version>3.19.11</version>
</dependency>

接下来,让我们向JDBCStreamAPIRepository添加一个新方法:

public Stream<CityRecord> getCitiesStreamUsingJOOQ(Connection connection)
        throws SQLException {
    PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
    connection.setAutoCommit(false);
    preparedStatement.setFetchSize(5000);
    ResultSet resultSet = preparedStatement.executeQuery();
    return DSL.using(connection)
      .fetchStream(resultSet)
      .map(r -> new CityRecord(r.get("NAME", String.class),
        r.get("COUNTRY", String.class)))];
}

我们使用ResultQuery类中的fetchStream()方法从ResultSet构建记录流。此外,我们将 JOOQ 记录映射到CityRecord实例,然后再从方法返回它们。

让我们调用新方法并验证其是否正确运行:

@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJOOQ_thenExpectedRecordsShouldBeReturned() throws SQLException {
    Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
      .getCitiesStreamUsingJOOQ(connection);
    List<CityRecord> cities = cityRecords.toList();
    assertThat(cities)
      .containsExactly(
        new CityRecord("London", "United Kingdom"),
        new CityRecord("Sydney", "Australia"),
        new CityRecord("Bucharest", "Romania"));
}

正如预期的那样,我们从流中的数据库中检索了所有城市记录。

3.使用jdbc-stream
或者,我们可以使用名为jdbc-stream 的轻量级库从ResultSet创建流。

让我们添加它的依赖项:

<dependency>
    <groupId>com.github.juliomarcopineda</groupId>
    <artifactId>jdbc-stream</artifactId>
    <version>0.1.1</version>
</dependency>

现在,让我们向JDBCStreamAPIRepository添加一个新方法:

public Stream<CityRecord> getCitiesStreamUsingJdbcStream(Connection connection)
        throws SQLException {
    PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
    connection.setAutoCommit(false);
    preparedStatement.setFetchSize(5000);
    ResultSet resultSet = preparedStatement.executeQuery();
    return JdbcStream.stream(resultSet)
      .map(r -> {
          try {
              return createCityRecord(resultSet);
          } catch (SQLException e) {
              throw new RuntimeException(e);
          }
      });
}

我们使用JdbcStream从ResultSet构建流。在底层,它使用 Spliterators 并使用与我们自己的实现相同的逻辑构建流。

现在,我们将检查新的存储库方法如何工作:

@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJdbcStream_thenExpectedRecordsShouldBeReturned() throws SQLException {
    Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
            .getCitiesStreamUsingJdbcStream(connection);
    List<CityRecord> cities = cityRecords.toList();
    assertThat(cities)
      .containsExactly(
        new CityRecord("London", "United Kingdom"),
        new CityRecord("Sydney", "Australia"),
        new CityRecord("Bucharest", "Romania"));
}

我们已经使用 jdbc-stream 库获得了所有预期的项目。

关闭资源
使用 JDBC 时,我们必须关闭所有使用的资源以避免连接泄漏。常见的做法是在Connection、PreparedStatement和ResultSet周围使用try-with-resources语法。但是,在使用流时,这种方法并不合适。如果我们从存储库方法返回一个流,则所有资源都将关闭,并且流上的任何操作都将无法访问它们。

为了避免这个问题,我们需要使用 流的onClose()方法关闭所有资源。此外,我们必须确保在使用完流后关闭该流。

让我们修改我们的存储库方法以包含资源关闭逻辑:

public Stream<CityRecord> getCitiesStreamUsingJdbcStream(Connection connection)
        throws SQLException {
    PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
    connection.setAutoCommit(false);
    preparedStatement.setFetchSize(5000);
    ResultSet resultSet = preparedStatement.executeQuery();
    return JdbcStream.stream(resultSet)
      .map(r -> {
          try {
              return createCityRecord(resultSet);
          } catch (SQLException e) {
              throw new RuntimeException(e);
          }
      })
      .onClose(() -> closeResources(connection, resultSet, preparedStatement));
}
private void closeResources(Connection connection, ResultSet resultSet, PreparedStatement preparedStatement) {
    try {
        resultSet.close();
        preparedStatement.close();
        connection.close();
        logger.info("Resources closed");
    } catch (SQLException e) {
        throw new RuntimeException(e);
    }
}

我们添加了closeResources()方法并将其附加到onClose()流处理程序。

现在,让我们修改客户端代码以确保使用后关闭流:

@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJdbcStream_thenExpectedRecordsShouldBeReturned() throws SQLException {
    Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
            .getCitiesStreamUsingJdbcStream(connection);
    List<CityRecord> cities = cityRecords.toList();
    cityRecords.close();
    assertThat(cities)
      .containsExactly(
        new CityRecord("London", "United Kingdom"),
        new CityRecord("Sydney", "Australia"),
        new CityRecord("Bucharest", "Romania"));
}

在这里,我们在处理完所有项目后关闭流。此外,我们可以观察到一条日志消息,表明所有资源都已关闭:

[main] INFO com.baeldung.resultset.streams.JDBCStreamAPIRepository -- Resources closed