1、基于 JDK 的虚拟线程实现:
摘自实际代码:
public static void withFlatMapUsingJDK() { ... var virtualThreadExecutor = Executors.newThreadPerTaskExecutor( Thread .ofVirtual() .name("jdk21-vt-", 0) .factory() );
try (virtualThreadExecutor) { // Submit tasks for parallel processing List<CompletableFuture<Void>> futures = users .stream() .map(user -> CompletableFuture.runAsync(() -> { try { log.info("Processing user: {}", user); processSomeBizLogic(user); successCount.incrementAndGet(); } catch (Exception e) { log.error("Error occurred while processing user {}: {}", user, e.getMessage()); failureCount.incrementAndGet(); } }, virtualThreadExecutor)) .toList(); // Collect CompletableFuture<Void> for each user
// Wait for all tasks to complete CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); try { allOf.join(); } catch (Exception e) { log.error("Error waiting for all tasks to complete: {}", e.getMessage()); } } ... }
|
2、基于 Spring Core Reactor 的虚拟线程实现
public static void withFlatMapUsingJDK() { ... // Custom executor with virtual threads var virtualThreadExecutor = Executors.newThreadPerTaskExecutor( Thread .ofVirtual() .name("rx-vt-", 0) .factory() );
try (virtualThreadExecutor) { Flux .fromIterable(objectList) .flatMap(obj -> Mono .fromCallable(() -> { log.info("Entering processUser in virtual thread: {}", obj); processSomeBizLogic(obj); log.info("Leaving processUser in virtual thread: {}", obj); successCount.incrementAndGet(); return obj; }) .doOnError(error -> { log.error("Error occurred while processing user {}: {}", obj, error.getMessage()); failureCount.incrementAndGet(); }) .onErrorResume(error -> { log.info("Skipping user due to error: {}", obj); return Mono.empty(); // Skip errored objects }) .subscribeOn(Schedulers.fromExecutor(virtualThreadExecutor)) // Use virtual threads ) .doOnComplete(() -> { log.info("Processing completed"); log.info("Success count: {}", successCount.get()); log.info("Failure count: {}", failureCount.get()); }) .blockLast(); } ... }
|
发处理列表中以下数量的对象
- 10 万对象
- 25万 个对象
- 50 万 个对象
结果:
处理整个列表所花费的总时间:
- 与 Spring Core Reactor 相比,基于 JDK 实现的虚拟线程速度超快。
- 此外,随着数据量的增加,基于 Spring Core Reactor 的应用程序处理时间缓慢呈指数增长,越来越极端慢。
内存占用:
- 对于 10 万个对象,与基于 Spring Reactor 的实现相比,基于 JDK 的实现需要在旧代Old Gen 中分配 33 倍的内存
- 对于 50 万个对象,与基于 Spring Reactor 的实现相比,基于 JDK 的实现在 旧代Old Gen 中使用的峰值内存是后者的 81 倍
GC 暂停:
- ,基于 JDK 的实现的 GC 暂停时间更长。尽管基于 JDK 的实现的 GC 暂停时间更长,但这对应用程序的延迟没有任何显著影响。
CPU 时间:
- 尽管基于 JDK 的实现需要更长的 CPU 时间来进行 GC 活动,但它不会对应用程序性能产生任何负面影响。
对象度量:
- 尽管基于 JDK 的实现的对象创建率和提升率明显更高,但它对应用程序性能的影响却微乎其微。
总结
- 对于基于虚拟线程的实现,JDK 应该是显而易见的选择,因为它们比 Spring Core Reactor快得多。
- 对于基于平台线程的实现,Spring Core Reactor比基于 JDK 的实现相对更快