使用Speedment实现并行数据库流

Speedment是开源Stream ORM Java工具包,能够将数据库表转为Java 8流,可以使用它根据现有数据库生成POJO, 支持并行数据库流,支持不同并行策略优化性能。

并行行数据库流通常比顺序流明显快很多,Java 8为我们带来了流Stream,Stream的一个优点是容易使得流操作并行化,获取任何流,应用方法parallel(),我们就能得到一个并行流,而不是默认的顺序流,并行流是使用 ForkJoinPool执行。

如果几个工作任务非常松耦合无关,那么使用并行流效果很好,将工作任务切分到几个线程中运行的过程就不那么费劲,同样,将并行计算结果合在一起的努力也减轻一些。并行流适合CPU密集型任务。

假设数据表:


CREATE TABLE `prime_candidate` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`value` bigint(20) NOT NULL,
`prime` bit(1) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;

使用并行流访问:


final JavapotApplication app = new JavapotApplicationBuilder()
.withPassword("javapot") // Replace with the real password
.withLogging(LogType.STREAM)
.build();

final Manager<PrimeCandidate> candidates = app.getOrThrow(PrimeCandidateManager.class);

candidates.stream()
.parallel()
.filter(PrimeCandidate.PRIME.isNull())
.map(pc -> pc.setPrime(PrimeUtil.isPrime(pc.getValue())))
.forEach(candidates.updater());

Speedment会使用Java默认并行行为(在Spliterators::spliteratorUnknownSize定义)对非计算密集型操作进行了优化,如果我们分析java的默认的并行化的行为,我们将发现它将使用第一个线程服务第一批1024工作项目,使用第二个线程为以后2 * 1024 = 2048的工作项服务,然后以此类推3 * 1024 = 3072项。

这种方式在某些情况下并不好,需要我们自己优化并行策略或定制ForkJoin。参考:Work with Parallel Database Streams using Java 8

Speedment