Java大型数据集合实现并行加速处理几种方法 - DZone


在这篇文章中,一个非常简单的转换操作将被应用于一个大型的Java数据集合。

转换操作
对于转换操作,我们定义了一个函数接口。它只是接收一个R类型的元素,应用一个转换操作,并返回一个S类型的转换对象。

@FunctionalInterface
public interface ElementConverter<R, S> {
    S apply(R param);
}

我们创建了ElementConverter接口的两个实现,其中一个将一个字符串转换为一个大写的字符串。

public class UpperCaseConverter implements ElementConverter<String, String> {
    @Override
    public String apply(String param) {
        return param.toUpperCase();
    }
}

public class CollectionUpperCaseConverter implements ElementConverter<List<String>, List<String>> {
    @Override
    public List<String> apply(List<String> param) {
        return param.stream().map(String::toUpperCase).collect(Collectors.toList());
    }
}

还实现了一个异步执行器(AsynchronousExecutor)类,除了一些其他辅助性的方法外,还为并行处理策略提供了一个专门的方法。

public class AsynchronousExecutor<T, E> {

    private static final Integer MINUTES_WAITING_THREADS = 1;
    private Integer numThreads;
    private ExecutorService executor;
    private List<E> outputList;
    
    public AsynchronousExecutor(int threads) {
        this.numThreads = threads;
        this.executor = Executors.newFixedThreadPool(this.numThreads);
        this.outputList = new ArrayList<>();
    }
  
    // Methods for each parallel processing strategy
  
      public void shutdown() {
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(MINUTES_WAITING_THREADS, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }


子列表分区
第一个提高对集合的转换操作的并行策略是基于java.util.AbstractList的扩展。
简而言之,CollectionPartitioner将一个源集合分割成子列表,子列表的大小是根据处理过程中使用的线程数计算的。
首先,通过取源集合大小和线程数之间的商来计算分块大小。
然后,根据成对的索引(fromIndex,toIndex)从源集合中复制每个子列表,这些索引的值是同步计算的。

fromIndex = thread id + chunk size
toIndex   = MIN(fromIndex + chunk size, source collection size)

public final class CollectionPartitioner<T> extends AbstractList<List<T>> {

    private final List<T> list;
    private final int chunkSize;
    
    public CollectionPartitioner(List<T> list, int numThreads) {
        this.list = list;
        this.chunkSize = (list.size() % numThreads == 0) ? 
                  (list.size() / numThreads) : (list.size() / numThreads) + 1;
    }
    
    @Override
    public synchronized List<T> get(int index) {
        var fromIndex = index * chunkSize;
        var toIndex = Math.min(fromIndex + chunkSize, list.size());
        
        if (fromIndex > toIndex) {
            return Collections.emptyList(); // Index out of allowed interval
        }
        
        return this.list.subList(fromIndex, toIndex); 
    }

    @Override
    public int size() {
        return (int) Math.ceil((double) list.size() / (double) chunkSize);
    }
}

一旦每个线程将转换操作应用于其各自子列表中的所有对象,它必须同步地将修改后的对象添加到输出列表中。这些步骤由AsynchronousExecutor类的一个特定方法指导。

public class AsynchronousExecutor<T, E> {
      public void processSublistPartition(List<T> inputList, ElementConverter<List<T>, List<E>> converter) {
        var partitioner = new CollectionPartitioner<T>(inputList, numThreads);    
        IntStream.range(0, numThreads).forEach(t -> this.executor.execute(() -> {        
            var thOutput = converter.apply(partitioner.get(t));            
            if (Objects.nonNull(thOutput) && !thOutput.isEmpty()) {
                synchronized (this.outputList) {
                    this.outputList.addAll(thOutput);
                }
            }
        }));
    }
}

浅层分割
第二个并行处理策略挪用了浅层拷贝概念背后的想法。事实上,参与处理的线程并没有收到从源集合复制的子列表。相反,每个线程使用子列表分区策略的相同代数计算各自的一对索引(fromIndex,toIndex),并直接在源集合上操作。但是,作为问题的一个要求,我们假设源集合不能被修改。在这种情况下,线程根据他们对源集合的分片来读取对象,并将新转换的对象存储在一个与原始集合相同大小的新集合中。

请注意,这种策略在转换操作过程中没有任何同步执行点,也就是说,所有线程都是完全独立地执行它们的任务。 但是组装输出集合至少可以使用两种不同的方法。

1、基于列表的浅层分割
在这种方法中,在处理集合之前,会创建一个由默认元素组成的新列表。这个新列表的互不相干的片断--以索引对(fromIndex, toIndex)为界限--被线程访问。它们存储了从源集合中读取各自片断所产生的每个新对象。AsynchronousExecutor类的一个新方法专门用于这种方法。

public class AsynchronousExecutor<T, E> {
      public void processShallowPartitionList(List<T> inputList, ElementConverter<T, E> converter) {    
        var chunkSize = (inputList.size() % this.numThreads == 0) ? 
                  (inputList.size() / this.numThreads) : (inputList.size() / this.numThreads) + 1;
        this.outputList = new ArrayList<>(Collections.nCopies(inputList.size(), null));
        
        IntStream.range(0, numThreads).forEach(t -> this.executor.execute(() -> {            
            var fromIndex = t * chunkSize;
            var toIndex = Math.min(fromIndex + chunkSize, inputList.size());
            
            if (fromIndex > toIndex) {
                fromIndex = toIndex;
            }
            
            IntStream.range(fromIndex, toIndex)
                          .forEach(i -> this.outputList.set(i, converter.apply(inputList.get(i))));
        }));
    }
}

2、基于数组的浅层分区
这种方法与之前的方法不同,只是因为线程使用数组来存储转换后的新对象,而不是一个列表。在所有线程完成其操作后,数组被转换为输出列表。同样,在AsynchronousExecutor类中为这个策略添加了一个新方法。

public class AsynchronousExecutor<T, E> {
  
    public void processShallowPartitionArray(List<T> inputList, ElementConverter<T, E> converter) 
        var chunkSize = (inputList.size() % this.numThreads == 0) ? 
                  (inputList.size() / this.numThreads) : (inputList.size() / this.numThreads) + 1;
        Object[] outputArr = new Object[inputList.size()];
        IntStream.range(0, numThreads).forEach(t -> this.executor.execute(() -> {
            
            var fromIndex = t * chunkSize;
            var toIndex = Math.min(fromIndex + chunkSize, inputList.size());
            
            if (fromIndex > toIndex) {
                fromIndex = toIndex;
            }
            
            IntStream.range(fromIndex, toIndex)
                          .forEach(i -> outputArr[i] = converter.apply(inputList.get(i)));
        }));
        
        this.shutdown();
        this.outputList = (List<E>) Arrays.asList(outputArr);
    }
}

由于所有测试都是在 4 核和每核 2 个线程的机器上进行的,因此预计该策略的加速率会随着使用多达 8 个线程而增加。尽管图表反映了这种行为,但该算法达到的最大加速比为 4.4X。1000 万个对象的集合达到了非常相似的比率

理想情况下,通过使用 8 个线程,加速比应该对应于 CPU 时间的 8 倍改进。

详细点击标题