合并排序是一种流行的排序算法,它通过将数组或列表划分为较小的子数组,对它们进行独立排序,然后将它们合并在一起来有效地对数组或列表进行排序。它以其有效性、稳定性和处理庞大数据集的能力而闻名。
简单多线程
通过在 Java 中使用多线程,我们可以将工作负载分配给多个线程并同时执行它们,我们可以提高合并排序的性能。
Java 的多线程功能允许在单个程序中执行多个线程,每个线程代表一个单独的控制流。我们可以通过使用多个线程来加速排序操作,从而利用可用的处理能力。多线程合并排序的基本概念是将输入数组分割成更小的部分,并为每个部分提供自己的线程进行排序。一旦各个部分被排序,我们将它们合并在一起以获得最终的排序数组。
输入: {9,2,5,1,7,4,8,3,6,11,15,12,13,10,14}
输出: [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
算法:
步骤1:创建一个类MultithreadedMergeSort,其私有实例变量为int[]类型的array和tempArray。
步骤2:在MultithreadedMergeSort类中实现一个公共方法sort(int[] inputArray)。该方法将负责启动排序过程。
第 3 步:在 sort() 方法内部:
- 步骤3.1:将inputArray分配给数组实例变量。
- 步骤3.2:初始化tempArray,其长度与inputArray相同。
步骤4:在MultithreadedMergeSort类中实现一个私有方法mergeSort(int low, int high)。此方法将使用递归处理实际的排序过程。
第 5 步:在 mergeSort() 方法内部:
- 步骤5.1:检查low是否小于high,以确保当前子数组中有多个元素。
- 步骤5.2:计算中间指数作为低点和高点的平均值。
第 6 步:使用 Thread 类和 lambda 表达式创建两个单独的线程,对子数组的左半部分和右半部分进行排序:
- 步骤 6.1:创建一个 leftThread,为左半部分递归调用 mergeSort(),传递 low 和 mid 作为参数。
- 步骤 6.2:创建一个 rightThread,为右半部分递归调用 mergeSort(),传递 mid + 1 和 high 作为参数。
步骤 7:使用 start() 方法启动两个线程。
步骤 8:在两个线程上使用 join() 方法等待其完成,然后再继续。
第 9 步:线程完成后,调用 merge() 方法来合并已排序的两半。
步骤10:在MultithreadedMergeSort类中实现一个私有方法merge(int low, int mid, int high)。此方法将两个已排序的一半合并回原始数组。
第 11 步:在 merge() 方法内部:
- 步骤 11.1:使用 System.arraycopy() 将两半复制到临时数组中。
- 步骤11.2:将leftIndex初始化为low,rightIndex初始化为mid + 1,currentIndex初始化为low。
- 步骤11.3:比较两半的元素,并将它们按升序合并回原始数组。
- 步骤 11.4:复制左半部分或右半部分的任何剩余元素(如果有)。
第12步:在main()方法中:
- 步骤12.1:创建MultithreadedMergeSort的实例。
- 步骤12.2:声明并初始化输入数组。
- 步骤 12.3:在 MultithreadedMergeSort 实例上调用 sort() 方法,并传递输入数组。
- 步骤12.4:使用Arrays.toString()打印排序后的数组。
import java.util.Arrays; public class MultithreadedMergeSort { private int[] array; private int[] tempArray; public void sort(int[] inputArray) { this.array = inputArray; int length = inputArray.length; this.tempArray = new int[length]; mergeSort(0, length - 1); } private void mergeSort(int low, int high) { if (low < high) { int mid = (low + high) / 2; // Create two separate threads to sort the left and right halves Thread leftThread = new Thread(() -> mergeSort(low, mid)); Thread rightThread = new Thread(() -> mergeSort(mid + 1, high)); // Start both threads leftThread.start(); rightThread.start(); try { // Wait for both threads to complete leftThread.join(); rightThread.join(); // Merge the sorted halves merge(low,mid,high); } catch (InterruptedException e) { e.printStackTrace(); } } } private void merge(int low,int mid,int high) { // Copy both halves into the temporary array System.arraycopy(array, low, tempArray, low, high - low + 1); int leftIndex = low; int rightIndex = mid + 1; int currentIndex = low; // Merge the two halves back into the original array while (leftIndex <= mid && rightIndex <= high) { if (tempArray[leftIndex]<=tempArray[rightIndex]) { array[currentIndex]=tempArray[leftIndex]; leftIndex++; } else { array[currentIndex]=tempArray[rightIndex]; rightIndex++; } currentIndex++; } // Copy the remaining elements of the left half (if any) while (leftIndex<=mid) { array[currentIndex]=tempArray[leftIndex]; leftIndex++; currentIndex++; } // Copy the remaining elements of the right half (if any) while (rightIndex <= high) { array[currentIndex] = tempArray[rightIndex]; rightIndex++; currentIndex++; } } public static void main(String[] args) { int[] array = {9,2,5,1,7,4,8,3,6,11,15,12,13,10,14}; MultithreadedMergeSort sorter=new MultithreadedMergeSort(); sorter.sort(array); System.out.println(Arrays.toString(array)); } }
|
输出:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]复杂性分析:
使用多线程进行合并排序的时间复杂度与普通合并排序算法相同,均为 O(n log n),其中 "n "为输入数组中的元素个数。
使用多线程进行合并排序的空间复杂度为 O(n),其中 "n "为输入数组中的元素个数。
使用 Fork/Join 框架的并行归并排序
ForkJoinPool 是 ExecutorService 的实现之一。它用于 CompletableFuture 和 Stream API。设计该类的目的是简化递归任务的并行性,方法是将单个任务分解成独立的任务,直到这些任务小到可以异步执行为止。使用该类,您可以在少量线程中执行大量任务。
让我们看看 ForkJoinTask 实现计算斐波那契数的简单示例:
public class FibonacciTask extends RecursiveTask<Integer> { private final int n;
public FibonacciTask(int n) { this.n = n; }
@Override public Integer compute() { if (n < 2) return n; FibonacciTask f1 = new FibonacciTask(n - 1); FibonacciTask f2 = new FibonacciTask(n - 2); f1.fork(); return f2.compute() + f1.join(); } }
|
ForkJoinTask-合并排序基于与世界一样古老的“分而治之”原则。我们需要将源问题划分为子任务,递归地解决它们并组合结果。
public class MergeSortAction extends RecursiveAction {
private final int[] arr;
public MergeSortAction(int[] arr) { this.arr = arr; }
@Override public void compute() { ... }
private void merge(int[] left, int[] right) { ... }
}
|
接下来,我们实现 merge() 方法。它将接受 2 个数组--左部分和右部分。我们需要通过比较元素来合并这些未排序的数组,并将最小的元素赋值给相应的数组索引。当其中一个数组为空时,while 循环将中断,但仍需合并另一个非空数组部分的数据。
为此,我们需要增加两个 while 循环来获取两个数组中的所有数据:
private void merge(int[] left, int[] right) { int i = 0, j = 0, k = 0; while (i < left.length && j < right.length) { if (left[i] < right[j]) arr[k++] = left[i++]; else arr[k++] = right[j++]; } while (i < left.length) { arr[k++] = left[i++]; } while (j < right.length) { arr[k++] = right[j++]; } }
|
接下来,我们将编写 compute() 方法来递归分割原始数组,并将结果传递给 merge() 方法。为此,我们需要计算数组的中间索引。然后将原始数组分为左右两部分。为了填充这两部分,我们通过调用 System.arraycopy(Object src, int srcPos, Object dest, int destPos, int length) 复制原始数据:
@Override public void compute() { if (arr.length < 2) return; int mid = arr.length / 2;
int[] left = new int[mid]; System.arraycopy(arr, 0, left, 0, mid);
int[] right = new int[arr.length - mid]; System.arraycopy(arr, mid, right, 0, arr.length - mid);
... }
|
现在,我们有了两个独立的数组。让我们通过创建新的 MergeSortAction 任务对它们进行递归划分,并通过将它们传入 invokeAll() 方法异步运行它们。我们使用 merge() 方法对两个数组进行排序和合并:
@Override public void compute() { if (arr.length < 2) return; int mid = arr.length / 2;
int[] left = new int[mid]; System.arraycopy(arr, 0, left, 0, mid);
int[] right = new int[arr.length - mid]; System.arraycopy(arr, mid, right, 0, arr.length - mid);
invokeAll(new MergeSortAction(left), new MergeSortAction(right)); merge(left, right); }
|
GitHub. source code
为什么在合并排序中 Streams 比 ForkJoinPool 更快?
有人比较了不同的 Java 并发机制。从运行的测试来看,Java 8 流在合并排序算法中似乎比 Java 7 ForkJoinPool 机制更快,后者是在考虑分而治之问题的情况下创建的。
这是结果(数百万个元素/时间(以秒为单位)):
+-----+---------+--------------+ | | Streams | ForkJoinPool | +-----+---------+--------------+ | 1M | 0.172s | 0.182s | | 2M | 0.36s | 0.346s | | 3M | 0.547s | 0.713s | | 4M | 0.746s | 0.618s | | 5M | 0.95s | 1.193s | | 6M | 1.206s | 1.078s | | 7M | 1.362s | 1.324s | | 8M | 1.636s | 1.416s | | 9M | 1.874s | 1.705s | | 10M | 2.133s | 2.858s | +-----+---------+--------------+
|
流:
public static void sort(int[] numbers) { int N = numbers.length; int[] temp = new int[N]; IntStream.range(1, N) .filter(n -> (n & -n) == n) // power of 2 .flatMap(n -> IntStream.iterate(0, i -> i + 2 * n) .limit((N - n) / (2 * n) + 1) .parallel() .map(i -> merge(numbers, temp, i, i + n - 1, Math.min(i + n + n - 1, N - 1)))) .toArray(); }
|
merge方法:
private static void merge(int[] a, int[] temp, int low, int mid, int high) { for (int i = low; i <= high; i++) { temp[i] = a[i]; } int i = low, j = mid + 1; for (int k = low; k <= high; k++) { if (i > mid) { a[k] = temp[j++]; } else if (j > high) { a[k] = temp[i++]; } else if (temp[i] < temp[j]) { a[k] = temp[i++]; } else { a[k] = temp[j++]; } } }
|
ForkJoinPool主要方法:
@Override protected void compute() { final int range = end - start; if (range > blockSize) { final int midPoint = start + (range / 2); final ForkingAction left = new ForkingAction(start, midPoint, blockSize); left.fork(); final ForkingAction right = new ForkingAction(midPoint + 1, end, blockSize); right.compute(); left.join(); merge(start, end); } else { sortSeq(start, end); } }
|
一种解释:Java7 中的 Join() 创建延续线程,而连接线程等待连接完成。在Java8中,线程只是等待。
Stream 使用 CountedCompleter 类,该类不使用 join()。自 2011 年以来,我一直在撰写对该框架的持续批评。
有关 Java8 的文章在这里如果您用 CountedCompleter 替换 RecursiveTask 那么您应该会得到更好的结果。使用 CountedCompeter 类是很困难的。