这是一个Spring Boot应用程序展示案例,它读取一个相对较大的JSON文件(200000多行),并使用ForkJoinPoolAPI和HikariCP 通过批处理将其内容插入MySQL 。
关键点:
1. 使用MySQL json类型
-- Create the table CREATE TABLE `lots` ( `lot` json DEFAULT NULL );
|
2. 对于MySQL,application.properties您可能希望将以下内容附加到JDBC URL:
- rewriteBatchedStatements=true: 此设置将强制在单个请求中发送批处理语句;
- cachePrepStmts=true:如果您决定设置启用缓存和是非常有用的 prepStmtCacheSize,prepStmtCacheSqlLimit等为好; 没有此设置,缓存被禁用;
- useServerPrepStmts=true:这样你就可以切换到服务器端准备好的语句(可能会带来显着的性能提升); 此外,您可以避免PreparedStatement在JDBC驱动程序级别进行模拟;
- 我们使用以下JDBC URL设置: ...?cachePrepStmts=true&useServerPrepStmts=true&rewriteBatchedStatements=true&createDatabaseIfNotExist=true
application.properties:
spring.datasource.url=jdbc:mysql://localhost:3306/citylots_db?cachePrepStmts=true&useServerPrepStmts=true&rewriteBatchedStatements=true&createDatabaseIfNotExist=true spring.datasource.username=root spring.datasource.password=root
spring.jpa.hibernate.ddl-auto=create spring.jpa.show-sql=true
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect
spring.jpa.properties.hibernate.jdbc.batch_size = 300
spring.datasource.initialization-mode=always spring.datasource.platform=mysql
spring.datasource.hikari.maximumPoolSize=8 spring.datasource.hikari.minimumIdle=8
logging.level.com.zaxxer.hikari.HikariConfig=DEBUG logging.level.com.zaxxer.hikari=TRACE
|
注意: 较旧的MySQL版本不能容忍将重写和服务器端预处理语句一起激活。为确保这些陈述仍然有效,请查看您正在使用的Connector / J的注释。
- 设置HikariCP以提供许多数据库连接,以确保数据库实现最小上下文切换(例如,2 * CPU核心数)
- 此应用程序用于 StopWatch测量将文件传输到数据库所需的时间
- 要运行应用程序,您必须citylots.zip在当前位置解压缩; 这是从Internet收集的相对较大的JSON文件
- 如果要查看有关批处理的详细信息,只需激活 DatasourceProxyBeanPostProcessor.java组件,取消注释 @Component; 这是必需的,因为此应用程序依赖于DataSource-Proxy
3. 将JSON文件读入一定容量的List,例如,等于或大于批处理数据大小; 默认情况下,批次大小为300行,临时列表为300 * 64(建议不要用这些值随意进行实验!)
@Repository @Transactional @Scope("prototype") public class RecursiveActionRepository extends RecursiveAction {
@Value("${spring.jpa.properties.hibernate.jdbc.batch_size}") private int batchSize;
@PersistenceContext private EntityManager entityManager; @Autowired private ApplicationContext applicationContext;
private final List<String> jsonList;
private static final Logger logger = Logger.getLogger(RecursiveActionRepository.class.getName()); private static final String SQL_INSERT = "INSERT INTO lots (lot) VALUES (?)";
public RecursiveActionRepository(List<String> jsonList) { this.jsonList = jsonList; }
@Override public void compute() { if (jsonList.size() > batchSize) { ForkJoinTask.invokeAll(createSubtasks()); } else { Session hibernateSession = entityManager.unwrap(Session.class); hibernateSession.doWork(this::insertJson); } }
private List<RecursiveActionRepository> createSubtasks() { List<RecursiveActionRepository> subtasks = new ArrayList<>();
int size = jsonList.size();
List<String> jsonListOne = jsonList.subList(0, (size + 1) / 2); List<String> jsonListTwo = jsonList.subList((size + 1) / 2, size);
subtasks.add(applicationContext.getBean( RecursiveActionRepository.class, new ArrayList<>(jsonListOne))); subtasks.add(applicationContext.getBean( RecursiveActionRepository.class, new ArrayList<>(jsonListTwo)));
return subtasks; }
public void insertJson(Connection connection) { try (PreparedStatement preparedStatement = connection.prepareStatement(SQL_INSERT)) {
int i = 1; for (String jsonLine : jsonList) { preparedStatement.setString(1, jsonLine); preparedStatement.addBatch();
if (i % batchSize == 0) { preparedStatement.executeBatch(); i = 0; }
i++; }
if (i > 1) { preparedStatement.executeBatch(); } logger.log(Level.INFO, "Processed by {0}", Thread.currentThread().getName());
} catch (SQLException e) { logger.log(Level.SEVERE, "SQL exception", e); } } }
|
4. 列表减半并创建子任务见createSubtasks,直到列表大小小于批量大小(例如,默认小于300)
当列表已满时,将其分批保存到MySQL中,清除列表,然后重新填充
源代码可以在这里找到。