Java中两种基础并行处理方式

并行处理是一种计算技术,其中多个任务或进程同时执行,将它们分解为可以同时处理的更小的子任务。并行处理不是一次处理一项任务,而是可以同时执行多个任务,从而提高性能和效率。

在并行处理系统中,复杂的任务被分为更小的独立部分,然后分配给多个处理器或内核。每个处理器同时处理其分配的任务,从而可以更快地执行和完成整个任务。这种方法与串行处理形成对比,串行处理中任务一个接一个地执行,可能会导致处理时间更长。

并行处理对于可以轻松划分为较小组件且不严重依赖彼此结果的任务特别有利。并行处理的常见应用包括科学模拟、数据分析、图像处理以及金融和工程等领域的各种计算任务。

并行处理可以使用不同的体系结构来实现,包括共享内存系统、分布式系统以及结合两者元素的混合系统。目标是利用多个处理器的集体计算能力来实现更快、更高效的计算,最终提高应用程序和系统的整体性能。

1、使用ForkJoinPool进行并行处理

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.stream.IntStream;
 
public class ParallelProcessingExample {
     
    public static void main(String[] args) {
        int[] numbers = IntStream.rangeClosed(1, 10).toArray();
         
        // Create a ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();
         
       
// Invoke parallel computation using RecursiveTask
        int sum = forkJoinPool.invoke(new SquareSumTask(numbers, 0, numbers.length));
         
       
// Print the final result
        System.out.println(
"Sum of numbers: " + sum);
    }
 
   
// RecursiveTask to compute the square sum in parallel
    static class SquareSumTask extends RecursiveTask<Integer> {
        private final int[] numbers;
        private final int start;
        private final int end;
 
        SquareSumTask(int[] numbers, int start, int end) {
            this.numbers = numbers;
            this.start = start;
            this.end = end;
        }
 
        @Override
        protected Integer compute() {
            if (end - start <= 1) {
                return numbers[start] * numbers[start];
            } else {
                int mid = (start + end) / 2;
                SquareSumTask leftTask = new SquareSumTask(numbers, start, mid);
                SquareSumTask rightTask = new SquareSumTask(numbers, mid, end);
 
               
// 分叉任务
                leftTask.fork();
                int rightResult = rightTask.compute();
 
               
// 加入结果
                int leftResult = leftTask.join();
 
               
// 合并结果
                return leftResult + rightResult;
            }
        }
    }
}

  • 首先使用 IntStream 创建一个从 1 到 10 的数字数组。
  • 然后创建一个 ForkJoinPool,以有效管理并行计算。
  • 主要计算由一个 SquareSumTask 执行,它是一个递归任务(RecursiveTask),可递归地将数组划分为子任务,直到达到基数为止。
  • Forking 分叉和连接joining 任务允许子任务并行执行,从而提高了性能。
  • 最终结果将被打印出来,展示并行计算的平方和。

例 2:使用 CompletableFuture 的并行流

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
 
public class ParallelProcessingExample {
 
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
 
        // 使用 CompletableFuture 执行并行计算
        List<CompletableFuture<Integer>> futures = numbers.stream()
                .map(number -> CompletableFuture.supplyAsync(() -> compute(number)))
                .collect(Collectors.toList());
 
       
// 等待所有计算完成
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        allFutures.join();
 
       
//汇总结果
        int sum = futures.stream()
                .map(CompletableFuture::join)
                .reduce(0, Integer::sum);
 
       
// Print the final result
        System.out.println(
"Sum of numbers: " + sum);
    }
 
   
// Example computation method
    private static int compute(int number) {
       
// Simulate some time-consuming computation
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
       
// Return the result
        return number * number;
    }
}

  • 程序首先创建一个整数列表,代表从 1 到 10 的数字。
  • CompletableFuture 用于表示并行计算,允许简洁地表达异步任务。
  • supplyAsync 方法用于异步计算每个数字的平方,从而促进并行性。
  • allOf 方法将所有 CompletableFuture 对象合并为一个对象,以便等待所有计算完成。
  • 然后,通过将每个 CompletableFuture 映射到其结果并还原它们来计算平方和,从而汇总结果。
  • 最终结果将被打印出来,展示并行计算的平方和汇总结果。

3、并行集合(Parallel Collections):

  • Java 8引入了并行流(Parallel Streams)和并行集合,使得对集合进行操作时可以并行处理。
  • 可以通过parallelStream()方法将普通流转换为并行流,从而在多核处理器上并行执行操作。

List<Integer> myList = Arrays.asList(1, 2, 3, 4, 5);

// 使用并行流
myList.parallelStream().forEach(System.out::println);

背后使用了Fork-Join机制

总结
在本文中,我们开始了一段 Java 并行处理之旅,探索两个不同的示例,展示并发计算的强大功能。从利用 ForkJoinPool 完成递归任务,到利用 CompletableFuture 和并行流的优越性,这些示例展示了通过并行优化性能的艺术。

在您深入研究并行处理时,请考虑每种方法的细微差别,以及它们如何与您的应用需求保持一致。无论是使用 ForkJoinPool 将任务划分为子问题,还是接受 CompletableFuture 的简单性,并行世界都在等待您的探索。