ParallelStream并行流在之前文章JAVA8新特性-Stream API中有簡單的介紹過它的使用。如Collection集合可以通過parallelStream()的得到一個并行流。
Stream<Integer> stream = new ArrayList<Integer>().parallelStream();
串行流也可以通過parallel()方法轉為并行流
Stream<Integer> stream = new ArrayList<Integer>().stream().parallel();
筆者在學習的過程中,也對并行流有著很多的疑問
- 串行流和并行流哪個效率更高?(這還有疑問嗎?肯定是并行流呀?sure?)
- 并行流得到的結果是否一定準確?
- 它的實現機制是什么樣的?
- 開發中可以使用并行流嘛?
現在就讓我們來深入了解一下Java8的這個新特性——并行流
并行流的效率是否更高
在Java8以前,遍歷一個長度非常大的集合往往非常麻煩,如需要使用多個線程配合synchronized,Lock和Atomic原子引用等進行遍歷,且不說多線程之間的調度,多線程同步API的上手成本也比較高。
現在我們有更為簡單的遍歷方式,且不局限于遍歷集合。
先往一個List添加10萬條記錄,代碼比較簡單,單條記錄的內容使用UUID隨機生成的英文字符串填充
List<String> list = new ArrayList<String>();
for (int i = 0; i < 100000; i++) {
list.add(UUID.randomUUID().toString());
}
普通for循環該List,然后將每條記錄中的a替換成b
for (int i = 0; i < list.size(); i++) {
String s = list.get(i);
String replace = s.replace("a", "b");
}
注意:這里使用String replace = s.replace("a", "b");這一行代碼作為簡單的業務處理,而不是System.out.println(s),因為打印的時候存在synchronized同步機制,會嚴重影響并行流的效率!
增強for循環
for (String s : list) {
String replace = s.replace("a", "b");
}
串行流
list.stream().forEach((s)->{
String replace = s.replace("a", "b");
});
并行流
list.parallelStream().forEach((s)->{
String replace = s.replace("a", "b");
});
在保證執行機器一樣的情況下,上述遍歷代碼各執行十次,取執行時間的平均值,單位毫秒,結果如下:
從結果中可知,在數據量較大的情況下,普通for,增強for和串行流的差距并不是很大,而并行流則以肉眼可見的差距領先于另外三者!
數據量較大的情況下,并行流的遍歷效率數倍于順序遍歷,在小數據量的情況下,并行流的效率還會那么高嗎?
將上面10萬的數據量改為1000,然后重復一百次取平均值,結果如下:
對結果進行分析,現在開發中比較少見的普通for遍歷集合的方式,居然是順序遍歷中速度最快的!而它的改進版增強for速度小遜于普通for。
究其原因,是增強for內部使用迭代器進行遍歷,需要維護ArrayList中的size變量,故而增加了時間開銷。
而串行流的時間開銷確實有點迷,可能的原因是開啟流和關閉流的時間開銷比較大
并行流花費的時間仍然優秀于另外的三種遍歷方式!
不過,有一點需要注意的是,并行流在執行時,CPU的占用會比另外三者高
現在我們可以得到一個結論,并行流在大數據量時,對比其它的遍歷方式有幾倍的提升,而在數據量比較小時,提升不明顯。
并行流處理結果是否準確
這個準確,舉個例子來說,我希望遍歷打印一個存有0 1 2 3 4 5 6 7 8 9的list,如0 1 2 3 4 5 6 7 8 9,代碼可能會這么寫
//數據
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
//遍歷打印
list.stream().forEach(i -> System.out.print(i + " "));
打印的結果如下:
0 1 2 3 4 5 6 7 8 9
結果沒有任何問題,如果是并行流呢?遍歷代碼如下
list.parallelStream().forEach(i -> System.out.print(i + " "));
打印的結果如下:
6 5 1 0 9 3 7 8 2 4
第二次打印的結果如下:
6 5 0 1 7 9 8 3 4 2
可以看到打印出來的順序是混亂無規律的
那是什么原因導致的呢?
并行流內部使用了默認的ForkJoinPool線程池,所以它默認的線程數量就是處理器的數量,通過Runtime.getRuntime().availableProcessors()可以得到這個值。
筆者電腦的線程數是12,這意味著并行流最多可以將任務劃分為12個小模塊進行處理,然后再合并計算得到結果
如將0~9這是個數字進行劃分:
0 1 2 3 4 5 6 7 8 9
第一次劃分得到兩個小模塊:
0 1 2 3 4
5 6 7 8 9
第二次劃分得到四個小模塊:
0 1 2
3 4
5 6 7
8 9
第三次劃分得到八個小模塊:
0 1
2
3
4
5 6
7
8
9
第三次劃分時,2 3 4這些數據,明顯已經不能再繼續劃分,故而2 3 4 這些數據可以先進行打印
第四次劃分得到10個小模塊:
0
1
2
3
4
5
6
7
8
9
這些小模塊在無法繼續細分后就會被打印,而打印處理的時候為了提高效率,不分先后順序,故而造成打印的亂序
結合以上的測試數據,我們可以得到這樣一個結論,當需要遍歷的數據,存在強順序性時,不能使用并行流,如順序打印0~9;不要求順序性時,可以使用并行流以提高效率,如將集合中的字符串中的"a"替換成"b"
并行流的實現機制
在Java7時,就已經提供了一個并發執行任務的API,Fork/Join,將一個大任務,拆分成若干個小任務, 再將各個小任務的運行結果匯總成最終的結果。
而在java8提供的并行流中,在實現Fork/Join的基礎上,還用了工作竊取模式來獲取各個小模塊的運行結果,使之效率更高!這個知識點筆者后續會另外寫一篇文章來介紹,敬請期待。
我們也可以使用Fock/Join機制,模仿一下并行流的實現過程。
如:進行數據的累加
public class ForkJionCalculate extends RecursiveTask<Long> {
private long start;
private long end;
/**
* 臨界值
*/
private static final long THRESHOLD = 10000L;
public ForkJionCalculate(long start, long end) {
this.start = start;
this.end = end;
}
/**
* 計算方法
* @return
*/
@Override
protected Long compute() {
long length = end - start;
if (length <= THRESHOLD) {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
long middle = (start + end) / 2;
ForkJionCalculate left = new ForkJionCalculate(start, middle);
left.fork();//拆分,并將該子任務壓入線程隊列
ForkJionCalculate right = new ForkJionCalculate(middle + 1, end);
right.fork();
return left.join() + right.join();
}
}
}
處理類需要實現RecursiveTask<T>接口,還需指定一個臨界值,臨界值的作用就是指定將任務拆分到什么程度就不拆了
測試代碼:
public static void main(String[] args) {
Instant start = Instant.now();
ForkJoinPool pool = new ForkJoinPool();
ForkJionCalculate task = new ForkJionCalculate(0, 10000000000L);
Long sum = pool.invoke(task);
System.out.println(sum);
Instant end = Instant.now();
System.out.println("耗費時間:" + Duration.between(start, end).toMillis());
}
并行流的適用場景
其實Java這門編程語言其實有很多種用途,通過swing類庫可以構建圖形用戶界面,配合ParallelGC進行一些科學計算任務,不過最廣泛的用途,還是作為一門服務器語言,開發服務器應用,我們以這種方式進行測試。
我們使用SpringBoot構建一個工程,然后寫一個控制器類,在控制器類中,如上進行1000和10萬的數據量測試
另外使用PostMan發送1000并發調用該接口,取平均時間,單位為毫秒值
控制器類測試代碼:
@RequestMApping("/parallel")
@ResponseBody
public String parallel() {
//生成測試數據
List<String> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add(UUID.randomUUID().toString());
}
//普通for遍歷
for (int i = 0; i < 1000; i++) {
String s = list.get(i);
String replace = s.replace("a", "b");
}
return "SUCCESS";
}
數據量1000時,每次請求消耗的時間
數據量10W時,每次請求消耗的時間
在之前的測試中,并行流對比其他的遍歷方式都有兩倍以上的差距,而在并發量較大的情況下,服務器線程本身就處于繁忙的狀態,即使使用并行流,優化的空間也不是很大,而且CPU的占用率也會比較高。故而可以看到,并行流在數據量1000或者10萬時,提升不是特別明顯。
但是并不是說并行流不能用于平常的開發中,如CPU本身的負載不高的情況下,還是可以使用的;在一些定時任務的項目中,為了縮短定時任務的執行時間,也可以斟酌使用。
最后總結一下:在數據量比較大的情況下,CPU負載本身不是很高,不要求順序執行的時候,可以使用并行流。
來源:
blog.csdn.net/weixiang2039/article/details/107102364