A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

java8推出steam流概念,像管道一样,将数据比作液体,在管道中进行处理.其中并行流之前不是很了解.在实际开发中担心线程开启数量不可控,会导致生产环境潜在风险.经过最近对它的了解,对并行流的使用有了新的认识.

一 性能方面:

1.串行流与并行流在数据量不大的前提下,串行处理的效率要比并行操作高.只有在较大数据前提下,才能显现并行的优势.

2.在对有顺序依赖的操作,如limit,findFirst等流操作时,并行操作需要做合并处理,非常消耗资源,建议先将流转为串行,在做顺序依赖操作.    list.parallelStream().sequential()  ,这两个方法可以多次调用, 只有最后一个调用决定这个流是顺序的还是并发的

3.在处理集合方面,arrayList,IntStream.range,HashSet,TreeSet等结构分布均匀的数据,处理效率较高,对LinkedList这种链表结构处理上就不那么给力了.

二 并行线程数量

1.并行流在启动线程上,默认会调用 Runtime.getRuntime().availableProcessors(),获取JVM底层最大设备线程数.(并不一定是设备实际线程数,这个自行查阅吧).

2.如果想设置并行线程启动数量,则需要全局设置

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");

三 并行底层理解

1.fork/join 框架,为jdk1.7引入.java8的stream多线程并行流的正是以这个框架为基础的.fork/join框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配线程池(ForkJoinPool)中的工作线程.

简单例子代码:



  • import java.util.concurrent.ForkJoinPool;



  • import java.util.concurrent.ForkJoinTask;



  • import java.util.concurrent.RecursiveTask;



  • import java.util.stream.LongStream;







  • /**



  • * Created by sunjin on 2016/7/5.



  • * 继承RecursiveTask来创建可以用于分支/合并的框架任务



  • */



  • public class ForkJoinSumCalculator extends RecursiveTask<Long> {



  •     //要求和的数组



  •     private final long[] numbers;



  •     //子任务处理的数组开始和终止的位置



  •     private final int start;



  •     private final int end;



  •     //不在将任务分解成子任务的阀值大小



  •     public static final int THRESHOLD = 10000;







  •     //用于创建组任务的构造函数



  •     public ForkJoinSumCalculator(long[] numbers){



  •         this(numbers, 0, numbers.length);



  •     }







  •     //用于递归创建子任务的构造函数



  •     public ForkJoinSumCalculator(long[] numbers,int start,int end){



  •         this.numbers = numbers;



  •         this.start = start;



  •         this.end = end;



  •     }







  •     //重写接口的方法



  •     @Override



  •     protected Long compute() {



  •         //当前任务负责求和的部分的大小



  •         int length = end - start;



  •         //如果小于等于阀值就顺序执行计算结果



  •         if(length <= THRESHOLD){



  •             return computeSequentially();



  •         }



  •         //创建子任务来为数组的前一半求和



  •         ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);



  •         //将子任务拆分出去,丢到ForkJoinPool线程池异步执行。



  •         leftTask.fork();



  •         //创建子任务来为数组的后一半求和



  •         ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);



  •         //第二个任务直接使用当前线程计算而不再开启新的线程。



  •         long rightResult = rightTask.compute();



  •         //读取第一个子任务的结果,如果没有完成则等待。



  •         long leftResult = leftTask.join();



  •         //合并两个子任务的计算结果



  •         return rightResult + leftResult;



  •     }







  •     //顺序执行计算的简单算法



  •     private long computeSequentially(){



  •         long sum = 0;



  •         for(int i =start; i< end; i++){



  •             sum += numbers;



  •         }



  •         return sum;



  •     }



  •     //提供给外部使用的入口方法



  •     public static long forkJoinSum(long n) {



  •         long[] numbers = LongStream.rangeClosed(1, n).toArray();



  •         ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);



  •         return new ForkJoinPool().invoke(task);



  •     }



  • }


2.工作盗取:

由于各种因素,即便任务拆分是平均的,也不能保证所有子任务能同时执行结束, 大部分情况是某些子任务已经结束, 其他子任务还有很多, 在这个时候就会有很多资源空闲, 所以fork/join框架通过工作盗取机制来保证资源利用最大化, 让空闲的线程去偷取正在忙碌的线程的任务。

在没有任务线程中的任务存在一个队列当中, 线程每次会从头部获取一个任务执行,执行完了再从queue的头部获取一个任务,直到队列中的所有任务执行完,这个线程偷取别的线程队列中的任务时会从队列到尾部获取任务,并且执行,直到所有任务执行结束。

从这个角度分析,任务的粒度越小, 资源利用越充分。


1 个回复

倒序浏览
奈斯,感谢分享
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马