public class StreamDemo7 {
//并行方式1
public long Parsum1(long n) {
Long sum= Stream.iterate(1L, i->i+1)
.limit(n)
.parallel()
.reduce(0L,Long::sum);
return sum;
}
//并行方式2
public long Parsum2(long n) {
//在数量大的时候,有初始值比没初始值速度更快
Long sum= LongStream.rangeClosed(0, n)
.parallel()
.reduce(0L, Long::sum);
return sum;
//这里没有拆装箱
}
//并行方式3
public long Parsum3(long n) {
Long sum= LongStream.rangeClosed(0, n)
.parallel()
.reduce(0L,(a,b)->a+b);
return sum;
}
//窜行
public long cuanSum(long n) {
Long sum= LongStream.rangeClosed(0, n)
.reduce(0L, Long::sum);
return sum;
}
//for
public long sumFor(long n) {
long sum=0L;
for (long i = 1; i <= n; i++) {
sum+=i;
}
return sum;
}
//传统线程
public long callSum(long n) throws InterruptedException, ExecutionException {
ExecutorService es= Executors.newFixedThreadPool(8);
long avg=n/8;
long num2=avg*2;
long num3=avg*3;
long num4=avg*4;
long num5=avg*5;
long num6=avg*6;
long num7=avg*7;
//运行线程,返回结果
Future<Long> fu1=es.submit(new MyCallable(1, avg));
Future<Long> fu2=es.submit(new MyCallable(avg+1, num2));
Future<Long> fu3=es.submit(new MyCallable(num2+1, num3));
Future<Long> fu4=es.submit(new MyCallable(num3+1, num4));
Future<Long> fu5=es.submit(new MyCallable(num4+1, num5));
Future<Long> fu6=es.submit(new MyCallable(num5+1, num6));
Future<Long> fu7=es.submit(new MyCallable(num6+1, num7));
Future<Long> fu8=es.submit(new MyCallable(num7+1, n));
@Test
public void testName() throws InterruptedException, ExecutionException {
long start=0L;
long duration=0L;
long sum=0L;
long temp=Long.MAX_VALUE;
//
// for (int i = 0; i < 2; i++) {
start=System.currentTimeMillis();
sum=sumFor(10_0000_0000);
duration=System.currentTimeMillis()-start;
if (duration<temp) {
temp=duration;
}
// }
System.out.println("sum="+sum);
System.out.println("最小时间"+temp+"毫秒");
}
static long sum=0;
public static void add(long n) {
sum+=n;
}
}
class MyCallable implements Callable<Long>{
private long sum1;
private long sum2;
public MyCallable(long sum1,long sum2) {
this.sum1=sum1;
this.sum2=sum2;
}
@Override
public Long call() throws Exception {
long sum=0;//定义在里面更好一些
for (long i = sum1; i < sum2+1; i++) {
sum+=i;
}
//返回计算结果
return sum;
}
}
既然并行流使用了frok-join,肯定有一种自动机制来为你拆分流。这种新的自动机制称为Spliterator
2.虽然分支/合并框架还算简单易用,不幸的是它也很容易被误用。以下是几个有效使用它的
最佳做法。
对一个任务调用join方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子
任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂,
因为每个子任务都必须等待另一个子任务完成才能启动。
不应该在RecursiveTask内部使用ForkJoinPool的invoke方法。相反,你应该始终直
接调用compute或fork方法,只有顺序代码才应该用invoke来启动并行计算。
对子任务调用fork方法可以把它排进ForkJoinPool。同时对左边和右边的子任务调用
它似乎很自然,但这样做的效率要比直接对其中一个调用compute低。这样做你可以为
其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销。
调试使用分支/合并框架的并行计算可能有点棘手。特别是你平常都在你喜欢的IDE里面
看栈跟踪(stack trace)来找问题,但放在分支合并计算上就不行了,因为调用compute
的线程并不是概念上的调用方,后者是调用fork的那个。
和并行流一样,你不应理所当然地认为在多核处理器上使用分支/合并框架就比顺序计
算快。我们已经说过,一个任务可以分解成多个独立的子任务,才能让性能在并行化时
有所提升。所有这些子任务的运行时间都应该比分出新任务所花的时间长;一个惯用方
法是把输入/输出放在一个子任务里,计算放在另一个里,这样计算就可以和输入/输出
同时进行。此外,在比较同一算法的顺序和并行版本的性能时还有别的因素要考虑。就
像任何其他Java代码一样,分支/合并框架需要“预热”或者说要执行几遍才会被JIT编
译器优化。这就是为什么在测量性能之前跑几遍程序很重要,我们的测试框架就是这么
做的。同时还要知道,编译器内置的优化可能会为顺序版本带来一些优势(例如执行死
码分析——删去从未被使用的计算)。
对于分支/合并拆分策略还有最后一点补充:你必须选择一个标准,来决定任务是要进一步
拆分还是已小到可以顺序求值
看个例子:
public class FrokJoinDemo extends RecursiveTask<Long>{
private static final long serialVersionUID = 1L;
//需求自已分组和合并
private long start=0;
private long end=0;
private static final long THRESHOLD=1_0000L;
public FrokJoinDemo(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if (end-start<=THRESHOLD) {
for (long i = start; i <= end; i++) {
sum+=i;
}
}else{
//进行细分
long middle=(start+end)/2;
FrokJoinDemo left = new FrokJoinDemo(start, middle);
FrokJoinDemo right = new FrokJoinDemo(middle+1, end);
left.fork();
right.fork();
sum=left.join()+right.join();
}
return sum;
}
测试代码:
public void test1() throws InterruptedException, ExecutionException {
Instant start=Instant.now();