Java中实现合并排序的三种方法

合并排序是一种流行的排序算法,它通过将数组或列表划分为较小的子数组,对它们进行独立排序,然后将它们合并在一起来有效地对数组或列表进行排序。它以其有效性、稳定性和处理庞大数据集的能力而闻名。

简单多线程
通过在 Java 中使用多线程,我们可以将工作负载分配给多个线程并同时执行它们,我们可以提高合并排序的性能。

Java 的多线程功能允许在单个程序中执行多个线程,每个线程代表一个单独的控制流。我们可以通过使用多个线程来加速排序操作,从而利用可用的处理能力。多线程合并排序的基本概念是将输入数组分割成更小的部分,并为每个部分提供自己的线程进行排序。一旦各个部分被排序,我们将它们合并在一起以获得最终的排序数组。

输入: {9,2,5,1,7,4,8,3,6,11,15,12,13,10,14}
输出: [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]

算法:
步骤1:创建一个类MultithreadedMergeSort,其私有实例变量为int[]类型的array和tempArray。

步骤2:在MultithreadedMergeSort类中实现一个公共方法sort(int[] inputArray)。该方法将负责启动排序过程。

第 3 步:在 sort() 方法内部:

  • 步骤3.1:将inputArray分配给数组实例变量。
  • 步骤3.2:初始化tempArray,其长度与inputArray相同。

步骤4:在MultithreadedMergeSort类中实现一个私有方法mergeSort(int low, int high)。此方法将使用递归处理实际的排序过程。

第 5 步:在 mergeSort() 方法内部:

  • 步骤5.1:检查low是否小于high,以确保当前子数组中有多个元素。
  • 步骤5.2:计算中间指数作为低点和高点的平均值。

第 6 步:使用 Thread 类和 lambda 表达式创建两个单独的线程,对子数组的左半部分和右半部分进行排序:

  • 步骤 6.1:创建一个 leftThread,为左半部分递归调用 mergeSort(),传递 low 和 mid 作为参数。
  • 步骤 6.2:创建一个 rightThread,为右半部分递归调用 mergeSort(),传递 mid + 1 和 high 作为参数。

步骤 7:使用 start() 方法启动两个线程。

步骤 8:在两个线程上使用 join() 方法等待其完成,然后再继续。

第 9 步:线程完成后,调用 merge() 方法来合并已排序的两半。

步骤10:在MultithreadedMergeSort类中实现一个私有方法merge(int low, int mid, int high)。此方法将两个已排序的一半合并回原始数组。

第 11 步:在 merge() 方法内部:

  • 步骤 11.1:使用 System.arraycopy() 将两半复制到临时数组中。
  • 步骤11.2:将leftIndex初始化为low,rightIndex初始化为mid + 1,currentIndex初始化为low。
  • 步骤11.3:比较两半的元素,并将它们按升序合并回原始数组。
  • 步骤 11.4:复制左半部分或右半部分的任何剩余元素(如果有)。

第12步:在main()方法中:

  • 步骤12.1:创建MultithreadedMergeSort的实例。
  • 步骤12.2:声明并初始化输入数组。
  • 步骤 12.3:在 MultithreadedMergeSort 实例上调用 sort() 方法,并传递输入数组。
  • 步骤12.4:使用Arrays.toString()打印排序后的数组。


import java.util.Arrays;  
  
public class MultithreadedMergeSort   
{  
    private int[] array;  
    private int[] tempArray;  
  
    public void sort(int[] inputArray) {  
        this.array = inputArray;  
        int length = inputArray.length;  
        this.tempArray = new int[length];  
        mergeSort(0, length - 1);  
    }  
  
    private void mergeSort(int low, int high) {  
        if (low < high) {  
            int mid = (low + high) / 2;  
  
            // Create two separate threads to sort the left and right halves  
            Thread leftThread = new Thread(() -> mergeSort(low, mid));  
            Thread rightThread = new Thread(() -> mergeSort(mid + 1, high));  
  
           
// Start both threads  
            leftThread.start();  
            rightThread.start();  
  
            try {  
               
// Wait for both threads to complete  
                leftThread.join();  
                rightThread.join();  
  
               
// Merge the sorted halves  
                merge(low,mid,high);  
            } catch (InterruptedException e)   
            {  
                e.printStackTrace();  
            }  
        }  
    }  
  
    private void merge(int low,int mid,int high)   
    {  
       
// Copy both halves into the temporary array  
        System.arraycopy(array, low, tempArray, low, high - low + 1);  
  
        int leftIndex = low;  
        int rightIndex = mid + 1;  
        int currentIndex = low;  
  
       
// Merge the two halves back into the original array  
        while (leftIndex <= mid && rightIndex <= high)   
        {  
            if (tempArray[leftIndex]<=tempArray[rightIndex])   
            {  
                array[currentIndex]=tempArray[leftIndex];  
                leftIndex++;  
            } else   
            {  
                array[currentIndex]=tempArray[rightIndex];  
                rightIndex++;  
            }  
            currentIndex++;  
        }  
  
       
// Copy the remaining elements of the left half (if any)  
        while (leftIndex<=mid)   
        {  
            array[currentIndex]=tempArray[leftIndex];  
            leftIndex++;  
            currentIndex++;  
        }  
  
       
// Copy the remaining elements of the right half (if any)  
        while (rightIndex <= high)   
        {  
            array[currentIndex] = tempArray[rightIndex];  
            rightIndex++;  
            currentIndex++;  
        }  
    }  
  
    public static void main(String[] args) {  
        int[] array = {9,2,5,1,7,4,8,3,6,11,15,12,13,10,14};  
        MultithreadedMergeSort sorter=new MultithreadedMergeSort();  
        sorter.sort(array);  
        System.out.println(Arrays.toString(array));  
    }  
}  

输出:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

复杂性分析:
使用多线程进行合并排序的时间复杂度与普通合并排序算法相同,均为 O(n log n),其中 "n "为输入数组中的元素个数。

使用多线程进行合并排序的空间复杂度为 O(n),其中 "n "为输入数组中的元素个数。

使用 Fork/Join 框架的并行归并排序
ForkJoinPool 是 ExecutorService 的实现之一。它用于 CompletableFuture 和 Stream API。设计该类的目的是简化递归任务的并行性,方法是将单个任务分解成独立的任务,直到这些任务小到可以异步执行为止。使用该类,您可以在少量线程中执行大量任务。

让我们看看 ForkJoinTask 实现计算斐波那契数的简单示例:

public class FibonacciTask extends RecursiveTask<Integer> {
    private final int n;

    public FibonacciTask(int n) {
        this.n = n;
    }

    @Override
    public Integer compute() {
        if (n < 2) return n;
        FibonacciTask f1 = new FibonacciTask(n - 1);
        FibonacciTask f2 = new FibonacciTask(n - 2);
        f1.fork();
        return f2.compute() + f1.join();
    }
}

ForkJoinTask-合并排序基于与世界一样古老的“分而治之”原则。我们需要将源问题划分为子任务,递归地解决它们并组合结果。

public class MergeSortAction extends RecursiveAction {

    private final int[] arr;

    public MergeSortAction(int[] arr) {
        this.arr = arr;
    }

    @Override
    public void compute() {
        ...
    }

    private void merge(int[] left, int[] right) {
        ...
    }

}

接下来,我们实现 merge() 方法。它将接受 2 个数组--左部分和右部分。我们需要通过比较元素来合并这些未排序的数组,并将最小的元素赋值给相应的数组索引。当其中一个数组为空时,while 循环将中断,但仍需合并另一个非空数组部分的数据。
为此,我们需要增加两个 while 循环来获取两个数组中的所有数据:

private void merge(int[] left, int[] right) {
    int i = 0, j = 0, k = 0;
    while (i < left.length && j < right.length) {
        if (left[i] < right[j])
            arr[k++] = left[i++];
        else
            arr[k++] = right[j++];
    }
    while (i < left.length) {
        arr[k++] = left[i++];
    }
    while (j < right.length) {
        arr[k++] = right[j++];
    }
}


接下来,我们将编写 compute() 方法来递归分割原始数组,并将结果传递给 merge() 方法。为此,我们需要计算数组的中间索引。然后将原始数组分为左右两部分。为了填充这两部分,我们通过调用 System.arraycopy(Object src, int srcPos, Object dest, int destPos, int length) 复制原始数据:

@Override
public void compute() {
    if (arr.length < 2) return;
    int mid = arr.length / 2;

    int[] left = new int[mid];
    System.arraycopy(arr, 0, left, 0, mid);

    int[] right = new int[arr.length - mid];
    System.arraycopy(arr, mid, right, 0, arr.length - mid);

    ...
}


现在,我们有了两个独立的数组。让我们通过创建新的 MergeSortAction 任务对它们进行递归划分,并通过将它们传入 invokeAll() 方法异步运行它们。我们使用 merge() 方法对两个数组进行排序和合并:

@Override
public void compute() {
    if (arr.length < 2) return;
    int mid = arr.length / 2;

    int[] left = new int[mid];
    System.arraycopy(arr, 0, left, 0, mid);

    int[] right = new int[arr.length - mid];
    System.arraycopy(arr, mid, right, 0, arr.length - mid);

    invokeAll(new MergeSortAction(left), new MergeSortAction(right));
    merge(left, right);
}

 GitHub. source code 


为什么在合并排序中 Streams 比 ForkJoinPool 更快?

有人比较了不同的 Java 并发机制。从运行的测试来看,Java 8 流在合并排序算法中似乎比 Java 7 ForkJoinPool 机制更快,后者是在考虑分而治之问题的情况下创建的。

这是结果(数百万个元素/时间(以秒为单位)):

+-----+---------+--------------+
|     | Streams | ForkJoinPool |
+-----+---------+--------------+
| 1M  | 0.172s  | 0.182s       |
| 2M  | 0.36s   | 0.346s       |
| 3M  | 0.547s  | 0.713s       |
| 4M  | 0.746s  | 0.618s       |
| 5M  | 0.95s   | 1.193s       |
| 6M  | 1.206s  | 1.078s       |
| 7M  | 1.362s  | 1.324s       |
| 8M  | 1.636s  | 1.416s       |
| 9M  | 1.874s  | 1.705s       |
| 10M | 2.133s  | 2.858s       |
+-----+---------+--------------+


流:
 

public static void sort(int[] numbers) {
     int N = numbers.length;
     int[] temp = new int[N];
     IntStream.range(1, N)
        .filter(n -> (n & -n) == n) // power of 2
        .flatMap(n -> IntStream.iterate(0, i -> i + 2 * n)
        .limit((N - n) / (2 * n) + 1)
        .parallel()
        .map(i -> merge(numbers, temp, i, i + n - 1, Math.min(i + n + n - 1, N - 1))))
    .toArray();
  }


merge方法:

private static void merge(int[] a, int[] temp, int low, int mid, int high) {
    for (int i = low; i <= high; i++) {
      temp[i] = a[i];
    }
    int i = low, j = mid + 1;
    for (int k = low; k <= high; k++) {
      if (i > mid) {
        a[k] = temp[j++];
      } else if (j > high) {
        a[k] = temp[i++];
      } else if (temp[i] < temp[j]) {
        a[k] = temp[i++];
      } else {
        a[k] = temp[j++];
      }
    }
}


ForkJoinPool主要方法:

@Override
protected void compute() {
  final int range = end - start;
  if (range > blockSize) {
    final int midPoint = start + (range / 2);
    final ForkingAction left = new ForkingAction(start, midPoint, blockSize);
    left.fork();
    final ForkingAction right = new ForkingAction(midPoint + 1, end, blockSize);
    right.compute();
    left.join();
    merge(start, end);
  } else {
    sortSeq(start, end);
  }
}

一种解释:Java7 中的 Join() 创建延续线程,而连接线程等待连接完成。在Java8中,线程只是等待。

Stream 使用 CountedCompleter 类,该类不使用 join()。自 2011 年以来,我一直在撰写对该框架的持续批评。

有关 Java8 的文章在这里如果您用 CountedCompleter 替换 RecursiveTask 那么您应该会得到更好的结果。使用 CountedCompeter 类是很困难的。