使用 Java 将批量数据插入 MySQL

在本文中,我们将讨论如何将数据批量插入MySQL数据库,并且与插入每条记录相比,我们将讨论这样做的好处。

使用案例

  • 我们从营销部门收到客户的 CSV 文件,我们的任务是向他们发送新营销活动的电子邮件。
  • 作为开发人员,我们构建了流程,从该文件中读取每个客户数据,使用营销信息修改和丰富它们,并将记录插入数据库表和排队作业。
  • 然后,排队的作业会读取它们并向每个客户发送电子邮件。

首先,我们需要向 pom XML 添加数据库驱动程序,以便我们可以连接到数据库。 
将 MySQL 驱动程序添加到 pom.xml

<dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.16</version>
    </dependency>


连接实例需要数据库的连接字符串、用户名和密码。
连接字符串如下所示: 

jdbc:mysql://{hostname/ip}:{port}/{database_schema_name}

public class JDBCConnection {
 
    public static Connection connect() throws SQLException {
       
// this is for demo but please don't setup username and password in code , instead read from config file or Vault/KMS
        Connection connection = DriverManager.getConnection(CONNECTION_STRING,
                USER, PASSWORD);
        if(connection != null){
                System.out.println(
"Connected to database with "+USER);
        }
        return connection;
    }
}

批量插入记录

  • 以下函数将 insertQuery 和要批量插入的记录作为参数。
  • 我们首先需要连接到数据库并将自动提交设置为 false。我们想要控制何时提交,因此需要将自动提交标记为 false。
  • PreparedStatement 是一种预编译查询,它非常高效,因为数据库不需要编译它们,特别是当我们有批量记录时,我们可以使用预编译查询并传递参数化参数,从而显着提高数据库性能。
  • 我们迭代每个输入记录并将其作为批次添加到准备好的语句中。
  • 然后我们可以对数据库执行批处理。

public void insertAsBatch(String insertQuery, List<Record> records) throws SQLException {
    Connection connection = JDBCConnection.connect();
    PreparedStatement preparedStatement = connection.prepareStatement(insertQuery);
    connection.setAutoCommit(false);
    records.stream().forEach(record -> {
        try {
            setPreparedStatement(record, connection, preparedStatement);
            preparedStatement.addBatch();
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    int[] ints = preparedStatement.executeBatch();
    System.out.println("Number of records to be inserted : "+ record.size());
    System.out.println(
"Number of records actually inserted : "+
            Arrays.stream(ints).reduce(Integer::sum).getAsInt());
    connection.commit();
    connection.close();
}

我们需要将每条记录添加到准备好的语句中。

private static void setPreparedStatement(Record record, Connection connection, PreparedStatement preparedStatement) throws SQLException, ParseException {
        preparedStatement.setString(1, record.getProp1());
        preparedStatement.setString(2, record.getProp1());
        preparedStatement.setString(3, record.getProp1());
        preparedStatement.setString(4, record.getProp1());
}

客户端代码:

public static void main(String[] args) throws SQLException {
   // read input files and set csvRecords
   recordDao.insertAsBatch(INSERT_QUERY, csvRecords);
}

  • 在本文中,我们使用PreparedStatement将输入记录设置为批量,并将它们作为单个原子操作插入到数据库中。
  • 如果其中之一未能插入数据库,则事务将回滚。所以它赋予批量操作原子性