如何通过ForkJoinPool和HikariCP将大型JSON文件批量处理到MySQL?


这是一个Spring Boot应用程序展示案例,它读取一个相对较大的JSON文件(200000多行),并使用ForkJoinPoolAPI和HikariCP 通过批处理将其内容插入MySQL 。

关键点:
1.  使用MySQL  json类型

-- Create the table 
CREATE TABLE `lots` (
  `lot` json DEFAULT NULL
);

2. 对于MySQL,application.properties您可能希望将以下内容附加到JDBC URL:

  • rewriteBatchedStatements=true: 此设置将强制在单个请求中发送批处理语句;
  • cachePrepStmts=true:如果您决定设置启用缓存和是非常有用的  prepStmtCacheSize,prepStmtCacheSqlLimit等为好; 没有此设置,缓存被禁用;
  • useServerPrepStmts=true:这样你就可以切换到服务器端准备好的语句(可能会带来显着的性能提升); 此外,您可以避免PreparedStatement在JDBC驱动程序级别进行模拟;
  • 我们使用以下JDBC URL设置: ...?cachePrepStmts=true&useServerPrepStmts=true&rewriteBatchedStatements=true&createDatabaseIfNotExist=true

application.properties:

spring.datasource.url=jdbc:mysql://localhost:3306/citylots_db?cachePrepStmts=true&useServerPrepStmts=true&rewriteBatchedStatements=true&createDatabaseIfNotExist=true
spring.datasource.username=root
spring.datasource.password=root

spring.jpa.hibernate.ddl-auto=create
spring.jpa.show-sql=true

spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect

spring.jpa.properties.hibernate.jdbc.batch_size = 300

spring.datasource.initialization-mode=always
spring.datasource.platform=mysql

spring.datasource.hikari.maximumPoolSize=8
spring.datasource.hikari.minimumIdle=8

logging.level.com.zaxxer.hikari.HikariConfig=DEBUG
logging.level.com.zaxxer.hikari=TRACE

注意:  较旧的MySQL版本不能容忍将重写和服务器端预处理语句一起激活。为确保这些陈述仍然有效,请查看您正在使用的Connector / J的注释。

  • 设置HikariCP以提供许多数据库连接,以确保数据库实现最小上下文切换(例如,2 * CPU核心数)
  • 此应用程序用于  StopWatch测量将文件传输到数据库所需的时间
  • 要运行应用程序,您必须citylots.zip在当前位置解压缩; 这是从Internet收集的相对较大的JSON文件
  • 如果要查看有关批处理的详细信息,只需激活  DatasourceProxyBeanPostProcessor.java组件,取消注释  @Component; 这是必需的,因为此应用程序依赖于DataSource-Proxy

3. 将JSON文件读入一定容量的List,例如,等于或大于批处理数据大小; 默认情况下,批次大小为300行,临时列表为300 * 64(建议不要用这些值随意进行实验!)


@Repository
@Transactional
@Scope("prototype")
public class RecursiveActionRepository extends RecursiveAction {

    @Value(
"${spring.jpa.properties.hibernate.jdbc.batch_size}")
    private int batchSize;

    @PersistenceContext
    private EntityManager entityManager;
    
    @Autowired
    private ApplicationContext applicationContext;

    private final List<String> jsonList;

    private static final Logger logger = Logger.getLogger(RecursiveActionRepository.class.getName());
    private static final String SQL_INSERT =
"INSERT INTO lots (lot) VALUES (?)";

    public RecursiveActionRepository(List<String> jsonList) {
        this.jsonList = jsonList;
    }   

    @Override
    public void compute() {
        if (jsonList.size() > batchSize) {
            ForkJoinTask.invokeAll(createSubtasks());
        } else {
            Session hibernateSession = entityManager.unwrap(Session.class);
            hibernateSession.doWork(this::insertJson);
        }
    }

    private List<RecursiveActionRepository> createSubtasks() {
        List<RecursiveActionRepository> subtasks = new ArrayList<>();

        int size = jsonList.size();

        List<String> jsonListOne = jsonList.subList(0, (size + 1) / 2);
        List<String> jsonListTwo = jsonList.subList((size + 1) / 2, size);

        subtasks.add(applicationContext.getBean(
                RecursiveActionRepository.class, new ArrayList<>(jsonListOne)));
        subtasks.add(applicationContext.getBean(
                RecursiveActionRepository.class, new ArrayList<>(jsonListTwo)));

        return subtasks;
    }

    public void insertJson(Connection connection) {
        try (PreparedStatement preparedStatement = connection.prepareStatement(SQL_INSERT)) {

            int i = 1;
            for (String jsonLine : jsonList) {
                preparedStatement.setString(1, jsonLine);
                preparedStatement.addBatch();

                if (i % batchSize == 0) {
                    preparedStatement.executeBatch();
                    i = 0;
                }

                i++;
            }

            if (i > 1) {
                preparedStatement.executeBatch();
            }
            
            logger.log(Level.INFO,
"Processed by {0}", Thread.currentThread().getName());

        } catch (SQLException e) {
            logger.log(Level.SEVERE,
"SQL exception", e);
        }
    }
}

4. 列表减半并创建子任务见createSubtasks,直到列表大小小于批量大小(例如,默认小于300)
当列表已满时,将其分批保存到MySQL中,清除列表,然后重新填充

源代码可以在这里找到。