在本文中,我们将讨论如何将数据批量插入MySQL数据库,并且与插入每条记录相比,我们将讨论这样做的好处。
使用案例
- 我们从营销部门收到客户的 CSV 文件,我们的任务是向他们发送新营销活动的电子邮件。
- 作为开发人员,我们构建了流程,从该文件中读取每个客户数据,使用营销信息修改和丰富它们,并将记录插入数据库表和排队作业。
- 然后,排队的作业会读取它们并向每个客户发送电子邮件。
首先,我们需要向 pom XML 添加数据库驱动程序,以便我们可以连接到数据库。
将 MySQL 驱动程序添加到 pom.xml
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> </dependency>
|
连接实例需要数据库的连接字符串、用户名和密码。
连接字符串如下所示:
jdbc:mysql://{hostname/ip}:{port}/{database_schema_name}
public class JDBCConnection { public static Connection connect() throws SQLException { // this is for demo but please don't setup username and password in code , instead read from config file or Vault/KMS Connection connection = DriverManager.getConnection(CONNECTION_STRING, USER, PASSWORD); if(connection != null){ System.out.println("Connected to database with "+USER); } return connection; } }
|
批量插入记录
- 以下函数将 insertQuery 和要批量插入的记录作为参数。
- 我们首先需要连接到数据库并将自动提交设置为 false。我们想要控制何时提交,因此需要将自动提交标记为 false。
- PreparedStatement 是一种预编译查询,它非常高效,因为数据库不需要编译它们,特别是当我们有批量记录时,我们可以使用预编译查询并传递参数化参数,从而显着提高数据库性能。
- 我们迭代每个输入记录并将其作为批次添加到准备好的语句中。
- 然后我们可以对数据库执行批处理。
public void insertAsBatch(String insertQuery, List<Record> records) throws SQLException { Connection connection = JDBCConnection.connect(); PreparedStatement preparedStatement = connection.prepareStatement(insertQuery); connection.setAutoCommit(false); records.stream().forEach(record -> { try { setPreparedStatement(record, connection, preparedStatement); preparedStatement.addBatch(); } catch (Exception e) { e.printStackTrace(); } }); int[] ints = preparedStatement.executeBatch(); System.out.println("Number of records to be inserted : "+ record.size()); System.out.println("Number of records actually inserted : "+ Arrays.stream(ints).reduce(Integer::sum).getAsInt()); connection.commit(); connection.close(); }
|
我们需要将每条记录添加到准备好的语句中。
private static void setPreparedStatement(Record record, Connection connection, PreparedStatement preparedStatement) throws SQLException, ParseException { preparedStatement.setString(1, record.getProp1()); preparedStatement.setString(2, record.getProp1()); preparedStatement.setString(3, record.getProp1()); preparedStatement.setString(4, record.getProp1()); }
|
客户端代码:
public static void main(String[] args) throws SQLException { // read input files and set csvRecords recordDao.insertAsBatch(INSERT_QUERY, csvRecords); }
|
- 在本文中,我们使用PreparedStatement将输入记录设置为批量,并将它们作为单个原子操作插入到数据库中。
- 如果其中之一未能插入数据库,则事务将回滚。所以它赋予批量操作原子性