在这篇文章中,一个非常简单的转换操作将被应用于一个大型的Java数据集合。
转换操作
对于转换操作,我们定义了一个函数接口。它只是接收一个R类型的元素,应用一个转换操作,并返回一个S类型的转换对象。
@FunctionalInterface public interface ElementConverter<R, S> { S apply(R param); }
|
我们创建了ElementConverter接口的两个实现,其中一个将一个字符串转换为一个大写的字符串。
public class UpperCaseConverter implements ElementConverter<String, String> { @Override public String apply(String param) { return param.toUpperCase(); } }
public class CollectionUpperCaseConverter implements ElementConverter<List<String>, List<String>> { @Override public List<String> apply(List<String> param) { return param.stream().map(String::toUpperCase).collect(Collectors.toList()); } }
|
还实现了一个异步执行器(AsynchronousExecutor)类,除了一些其他辅助性的方法外,还为并行处理策略提供了一个专门的方法。
public class AsynchronousExecutor<T, E> {
private static final Integer MINUTES_WAITING_THREADS = 1; private Integer numThreads; private ExecutorService executor; private List<E> outputList; public AsynchronousExecutor(int threads) { this.numThreads = threads; this.executor = Executors.newFixedThreadPool(this.numThreads); this.outputList = new ArrayList<>(); } // Methods for each parallel processing strategy public void shutdown() { this.executor.shutdown(); try { this.executor.awaitTermination(MINUTES_WAITING_THREADS, TimeUnit.MINUTES); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } }
|
子列表分区
第一个提高对集合的转换操作的并行策略是基于java.util.AbstractList的扩展。
简而言之,CollectionPartitioner将一个源集合分割成子列表,子列表的大小是根据处理过程中使用的线程数计算的。
首先,通过取源集合大小和线程数之间的商来计算分块大小。
然后,根据成对的索引(fromIndex,toIndex)从源集合中复制每个子列表,这些索引的值是同步计算的。
fromIndex = thread id + chunk size toIndex = MIN(fromIndex + chunk size, source collection size)
public final class CollectionPartitioner<T> extends AbstractList<List<T>> {
private final List<T> list; private final int chunkSize; public CollectionPartitioner(List<T> list, int numThreads) { this.list = list; this.chunkSize = (list.size() % numThreads == 0) ? (list.size() / numThreads) : (list.size() / numThreads) + 1; } @Override public synchronized List<T> get(int index) { var fromIndex = index * chunkSize; var toIndex = Math.min(fromIndex + chunkSize, list.size()); if (fromIndex > toIndex) { return Collections.emptyList(); // Index out of allowed interval } return this.list.subList(fromIndex, toIndex); }
@Override public int size() { return (int) Math.ceil((double) list.size() / (double) chunkSize); } }
|
一旦每个线程将转换操作应用于其各自子列表中的所有对象,它必须同步地将修改后的对象添加到输出列表中。这些步骤由AsynchronousExecutor类的一个特定方法指导。
public class AsynchronousExecutor<T, E> { public void processSublistPartition(List<T> inputList, ElementConverter<List<T>, List<E>> converter) { var partitioner = new CollectionPartitioner<T>(inputList, numThreads); IntStream.range(0, numThreads).forEach(t -> this.executor.execute(() -> { var thOutput = converter.apply(partitioner.get(t)); if (Objects.nonNull(thOutput) && !thOutput.isEmpty()) { synchronized (this.outputList) { this.outputList.addAll(thOutput); } } })); } }
|
浅层分割
第二个并行处理策略挪用了浅层拷贝概念背后的想法。事实上,参与处理的线程并没有收到从源集合复制的子列表。相反,每个线程使用子列表分区策略的相同代数计算各自的一对索引(fromIndex,toIndex),并直接在源集合上操作。但是,作为问题的一个要求,我们假设源集合不能被修改。在这种情况下,线程根据他们对源集合的分片来读取对象,并将新转换的对象存储在一个与原始集合相同大小的新集合中。
请注意,这种策略在转换操作过程中没有任何同步执行点,也就是说,所有线程都是完全独立地执行它们的任务。 但是组装输出集合至少可以使用两种不同的方法。
1、基于列表的浅层分割
在这种方法中,在处理集合之前,会创建一个由默认元素组成的新列表。这个新列表的互不相干的片断--以索引对(fromIndex, toIndex)为界限--被线程访问。它们存储了从源集合中读取各自片断所产生的每个新对象。AsynchronousExecutor类的一个新方法专门用于这种方法。
public class AsynchronousExecutor<T, E> { public void processShallowPartitionList(List<T> inputList, ElementConverter<T, E> converter) { var chunkSize = (inputList.size() % this.numThreads == 0) ? (inputList.size() / this.numThreads) : (inputList.size() / this.numThreads) + 1; this.outputList = new ArrayList<>(Collections.nCopies(inputList.size(), null)); IntStream.range(0, numThreads).forEach(t -> this.executor.execute(() -> { var fromIndex = t * chunkSize; var toIndex = Math.min(fromIndex + chunkSize, inputList.size()); if (fromIndex > toIndex) { fromIndex = toIndex; } IntStream.range(fromIndex, toIndex) .forEach(i -> this.outputList.set(i, converter.apply(inputList.get(i)))); })); } }
|
2、基于数组的浅层分区
这种方法与之前的方法不同,只是因为线程使用数组来存储转换后的新对象,而不是一个列表。在所有线程完成其操作后,数组被转换为输出列表。同样,在AsynchronousExecutor类中为这个策略添加了一个新方法。
public class AsynchronousExecutor<T, E> { public void processShallowPartitionArray(List<T> inputList, ElementConverter<T, E> converter) var chunkSize = (inputList.size() % this.numThreads == 0) ? (inputList.size() / this.numThreads) : (inputList.size() / this.numThreads) + 1; Object[] outputArr = new Object[inputList.size()]; IntStream.range(0, numThreads).forEach(t -> this.executor.execute(() -> { var fromIndex = t * chunkSize; var toIndex = Math.min(fromIndex + chunkSize, inputList.size()); if (fromIndex > toIndex) { fromIndex = toIndex; } IntStream.range(fromIndex, toIndex) .forEach(i -> outputArr[i] = converter.apply(inputList.get(i))); })); this.shutdown(); this.outputList = (List<E>) Arrays.asList(outputArr); } }
|
由于所有测试都是在 4 核和每核 2 个线程的机器上进行的,因此预计该策略的加速率会随着使用多达 8 个线程而增加。尽管图表反映了这种行为,但该算法达到的最大加速比为 4.4X。1000 万个对象的集合达到了非常相似的比率
理想情况下,通过使用 8 个线程,加速比应该对应于 CPU 时间的 8 倍改进。
详细点击标题