点击上方 Java后端,选择 设为星标
优质文章,及时送达
在 Java 7 之前,如果想要并行处理一个集合,我们需要以下几步
1. 手动分成几部分
2. 为每部分创建线程
3. 在适当的时候合并。
并且还需要关注多个线程之间共享变量的修改问题。而 Java8 为我们提供了并行流,可以一键开启并行模式。是不是很酷呢?让我们来看看。
并行流
认识和开启并行流
集合,而 list 中每个 apple 对象只有重量,我们也知道 apple 的单价是 5元/kg,现在需要计算出每个 apple 的单价,传统的方式是这样:
List<Apple> appleList = new ArrayList<>(); // 假装数据是从库里查出来的
for (Apple apple : appleList) {
apple.setPrice(5.0 * apple.getWeight() / 1000);
}
appleList.parallelStream().forEach(apple -> apple.setPrice(5.0 * apple.getWeight() / 1000));
parallelStream()
方法。当然也可以通过 stream.parallel() 将普通流转换成并行流。并行流也能通过 sequential() 方法转换为顺序流,但要注意:流的并行和顺序转换不会对流本身做任何实际的变化,仅仅是打了个标记而已。并且在一条流水线上对流进行多次并行 / 顺序的转换,生效的是最后一次的方法调用
测试并行流的性能
public static void main(String[] args) throws InterruptedException {
List<Apple> appleList = initAppleList();
Date begin = new Date();
for (Apple apple : appleList) {
apple.setPrice(5.0 * apple.getWeight() / 1000);
Thread.sleep(1000);
}
Date end = new Date();
log.info("苹果数量:{}个, 耗时:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);
}
List<Apple> appleList = initAppleList();
Date begin = new Date();
appleList.parallelStream().forEach(apple ->
{
apple.setPrice(5.0 * apple.getWeight() / 1000);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
);
Date end = new Date();
log.info("苹果数量:{}个, 耗时:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);
并行流可以随便用吗?
可拆分性影响流的速度
1. 对于 iterate 方法来处理的前 n 个数字来说,不管并行与否,它总是慢于循环的,非并行版本可以理解为流化操作没有循环更偏向底层导致的慢。可并行版本是为什么慢呢?这里有两个需要注意的点:
-
iterate 生成的是装箱的对象,必须拆箱成数字才能求和
-
我们很难把 iterate 分成多个独立的块来并行执行
这个问题很有意思,我们必须意识到某些流操作比其他操作更容易并行化。对于 iterate 来说,每次应用这个函数都要依赖于前一次应用的结果。因此在这种情况下,我们不仅不能有效的将流划分成小块处理。反而还因为并行化再次增加了开支。
2. 而对于 LongStream.rangeClosed() 方法来说,就不存在 iterate 的第两个痛点了。它生成的是基本类型的值,不用拆装箱操作,另外它可以直接将要生成的数字 1 - n 拆分成 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 这样四部分。因此并行状态下的 rangeClosed() 是快于 for 循环外部迭代的:
package lambdasinaction.chap7;
import java.util.stream.*;
public class ParallelStreams {
public static long iterativeSum(long n) {
long result = 0;
for (long i = 0; i <= n; i++) {
result += i;
}
return result;
}
public static long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get();
}
public static long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get();
}
public static long rangedSum(long n) {
return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong();
}
public static long parallelRangedSum(long n) {
return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong();
}
}
package lambdasinaction.chap7;
import java.util.concurrent.*;
import java.util.function.*;
public class ParallelStreamsHarness {
public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool();
public static void main(String[] args) {
System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs");
System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs");
System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" );
System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs");
System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" );
}
public static <T, R> long measurePerf(Function<T, R> f, T input) {
long fastest = Long.MAX_VALUE;
for (int i = 0; i < 10; i++) {
long start = System.nanoTime();
R result = f.apply(input);
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Result: " + result);
if (duration < fastest) fastest = duration;
}
return fastest;
}
}
共享变量修改的问题
public static long sideEffectSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).forEach(accumulator::add);
return accumulator.total;
}
public static long sideEffectParallelSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
return accumulator.total;
}
public static class Accumulator {
private long total = 0;
public void add(long value) {
total += value;
}
}
顺序执行每次输出的结果都是:50000005000000,而并行执行的结果却五花八门了。这是因为每次访问 totle 都会存在数据竞争,关于数据竞争的原因,大家可以看看关于 volatile 的博客。因此当代码中存在修改共享变量的操作时,是不建议使用并行流的。
并行流的使用注意
在并行流的使用上有下面几点需要注意:
-
对于较少的数据量,不建议使用并行流
-
容易拆分成块的流数据,建议使用并行流
-
以下是一些常见的集合框架对应流的可拆分性能表
Java后端交流群已成立
公众号运营至今,离不开小伙伴们的支持。为了给小伙伴们提供一个互相交流的平台,特地开通了官方交流群。扫描下方二维码备注 进群 或者关注公众号 Java后端 后获取进群通道。
推 荐 阅 读 1. 抓包神器:Charles 2. Spring Boot 打包上传至 Docker 仓库? 3. 2W 字详解设计模式 4. 连夜撸了一个简易聊天室 5. 推荐一款 Java 对象映射神器
本文分享自微信公众号 - Java后端(web_resource)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。