今天,我將通過一個例子向大家介紹幾種常見的并發編程方案。
我們通過一個程序模擬統計一批文檔的字數。
首先我們先看無并發情況下的DEMO:
// 用Doc代表文檔
public class Doc {
public final String content;
public Doc(String content) {
this.content = content;
}
}
// 模擬目錄和文件
public class Dir {
public final static List<Doc> docs = new ArrayList<>();
static {
docs.add( new Doc("""
《三國演義》是綜合民間傳說和戲曲、話本,結合陳壽的《三國志》、范曄《后漢書》、
元代《三國志平話》、和裴松之注的史料,以及作者個人對社會人生的體悟寫成。現所見刊本
以明嘉靖本最早,分24卷,240則。清初毛宗崗父子又做了一些修改,并成為現在最常見的120回本。""") );
docs.add( new Doc("""
《水滸傳》是中國歷史上第一部用古白話文寫成的歌頌農民起義的長篇章回體版塊結構小說,
以宋江領導的起義軍為主要題材,通過一系列梁山英雄反抗壓迫、英勇斗爭的生動故事,暴露
了北宋末年統治階級的腐朽和殘暴,揭露了當時尖銳對立的社會矛盾和“官逼民反的殘酷現實。""") );
docs.add( new Doc("""
《西游記》前七回敘述孫悟空出世,有大鬧天宮等故事。此后寫孫悟空隨唐僧西天取經,
沿途除妖降魔、戰勝困難的故事。書中唐僧、孫悟空、豬八戒、沙僧等形象刻畫生動,規模宏大,
結構完整,并且《西游記》富有濃厚的中國佛教色彩,其隱含意義非常深遠,眾說紛紜,見仁見智。
可以從佛、道、俗等多個角度欣賞,是中國古典小說中偉大的浪漫主義文學作品。""") );
docs.add( new Doc("""
《紅樓夢》講述的是發生在一個虛構朝代的封建大家庭中的人事物,其中以賈寶玉、
林黛玉、薛寶釵三個人之間的感情糾葛為主線通過對一些日常事件的描述體現了在賈府
的大觀園中以金陵十二釵為主體的眾女子的愛恨情愁。""") );
}
}
public class wordCount {
int countDoc( Doc doc ) {
return doc.content.length();
}
void count() {
int c = 0;
for ( Doc doc : Dir.docs ) {
c += countDoc(doc);
}
System.out.println( "所有文檔字數總計: " + c + "個" );
}
public static void mAIn(String[] args) {
new WordCount().count();
}
}
執行結果:
1.共享內存與鎖
這是一種非常熟悉和常見的技術,JAVA在這方面的實現非常出色。使用它通常會經歷三個階段:初學時感覺復雜和可怕,掌握后變得非常好和強大,進一步深入學習后又變得復雜和具有一定危險性。
共享內存允許多個進程同時讀寫一塊或多塊常規內存區域。有時候,進程需要在這些內存區域上執行一系列具有原子性的操作,其他進程在這些操作完成之前不能訪問這些區域。為了解決這個問題,我們可以使用鎖,這是一種只允許一個進程訪問某種資源的機制。
這個方案存在多個缺點:
-
即使沖突的概率很低,鎖的開銷仍然無法忽略。
-
這些鎖也是內存系統中的競爭熱點。
-
如果出現錯誤的進程不處理鎖,可能會導致正在加鎖的鎖被丟棄。
-
當鎖出現問題時,調試極為困難。
此外,當使用鎖來同步兩三個進程時,可能沒有太大問題。然而,隨著進程數量的增加,情況會變得越來越難以控制。最終,這可能導致復雜的死鎖問題,即使是最經驗豐富的開發者也無法預見。
這個方案大家都比較熟熟悉,通過多個線共同處理文章:
// 計數器通過同步保障多線計數
public class Counter1 {
private int c = 0;
public synchronized void inc( int n ) {
this.c += n;
}
public synchronized int totalNumber() {
return c;
}
}
// 文檔字數計算
public class DocProc1 implements Runnable {
private final Counter1 counter;
public final Doc doc;
public DocProc1(Counter1 counter, Doc doc) {
this.counter = counter;
this.doc = doc;
}
public void run() {
int c = countDoc( this.doc );
counter.inc( c );
}
int countDoc( Doc doc ) {
return doc.content.length();
}
}
public class WordCount1 {
private final Counter1 counter = new Counter1();
void count() throws InterruptedException {
List<Thread> threads = new ArrayList<>();
// 啟動多個線程處理文章
for ( Doc doc : Dir.docs ) {
DocProc1 docProc1 = new DocProc1(counter , doc);
Thread t = new Thread(docProc1);
threads.add( t );
t.start();
}
for (Thread t : threads ) {
t.join();
}
System.out.println( "所有文檔字數總計: " + this.counter.totalNumber() + "個" );
}
public static void main(String[] args) throws InterruptedException {
new WordCount1().count();
}
}
執行結果:
2.軟件事務性內存(STM)
STM(Software Transactional Memory,軟件事務性內存)是一種將內存視為傳統數據庫,并使用事務來確定何時寫入什么內容的方法。
通常,這種實現采用樂觀的方式來避免鎖的使用。它將一組讀寫訪問視為一個單獨的操作,如果兩個進程同時嘗試訪問共享區域,則它們各自啟動一個事務,最終只有一個事務會成功。另一個進程會意識到事務失敗,并在檢查共享區域的新內容后進行重試。
該模型直截了當,誰都不需要等待其他進程釋放鎖。
STM的主要缺點是必須重試失敗的事務,甚至可能多次失敗。此外,事務系統本身也會帶來相當大的開銷,并且在確定哪個進程成功之前,需要額外的內存來存儲試圖寫入的數據。理想情況下,系統應該像支持虛擬內存那樣為事務性內存提供硬件支持。
對于程序員來說,STM的可控性似乎比鎖更好,只要競爭不頻繁導致事務重啟,就能充分發揮并發的優勢。我們認為這種方法本質上是持鎖共享內存的一種變體,其在操作系統層面的作用比應用編程層面更為重要。然而,針對這個問題的研究仍然非常活躍,局勢可能會發生改變。
Java并沒有直接支持STM方案,因此要實現一個通用的庫會相對復雜。在這里,我們簡單介紹一下它的原理。
public class Counter2 {
private final AtomicInteger c = new AtomicInteger();
// 只為表達STM原理
public boolean inc( int n ) {
int oldVal = c.get();
int newVal = oldVal + n;
return c.compareAndSet(oldVal , newVal);
}
public int totalNumber() {
return c.get();
}
}
public class DocProc2 implements Runnable {
private final Counter2 counter;
public final Doc doc;
public DocProc2(Counter2 counter, Doc doc) {
this.counter = counter;
this.doc = doc;
}
public void run() {
int c = countDoc( this.doc );
while (!counter.inc( c ));
}
int countDoc( Doc doc ) {
return doc.content.length();
}
}
public class WordCount2 {
private final Counter2 counter = new Counter2();
void count() throws InterruptedException {
List<Thread> threads = new ArrayList<>();
for ( Doc doc : Dir.docs ) {
DocProc2 docProc = new DocProc2(counter , doc);
Thread t = new Thread(docProc);
threads.add( t );
t.start();
}
for (Thread t : threads ) {
t.join();
}
System.out.println( "所有文檔字數總計: " + this.counter.totalNumber() + "個" );
}
public static void main(String[] args) throws InterruptedException {
new WordCount2().count();
}
}
執行結果:
3.Future
另一個更現代的手段是采用所謂的future。
該方法的基本思路是,每個future都代表一個計算結果,這個結果被外包給其他進程處理,這個進程可以在其他CPU甚至其他計算機上運行。
Future可以像其他對象一樣被傳遞,但在計算完成之前無法讀取結果,必須等待計算完成。雖然這種方法簡化了并發系統中的數據傳遞,但也使得程序在遠程進程故障和網絡故障的情況下變得脆弱。當計算結果尚未準備好而連接斷開時,試圖訪問值的代碼將無法執行。
public class DocProc3 implements Callable<Integer> {
public final Doc doc;
public DocProc3(Doc doc) {
this.doc = doc;
}
public Integer call() {
return countDoc( this.doc );
}
int countDoc( Doc doc ) {
return doc.content.length();
}
}
public class WordCount3 {
void count() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future<Integer>> futures = new ArrayList<>();
for ( Doc doc : Dir.docs ) {
Future<Integer> future = executorService.submit(new DocProc3(doc));
futures.add(future);
}
int c = 0;
for (Future<Integer> f : futures ) {
c += f.get();
}
System.out.println( "所有文檔字數總計: " + c + "個" );
executorService.shutdownNow();
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
new WordCount3().count();
}
}
執行結果:
4.函數式編程
函數式編程是一種計算機科學中的編程范式,它通過應用和組合函數來構建程序。它是一種聲明式編程范式,其中函數定義是將值映射到其他值的表達式樹,而不是通過一系列命令式語句來更新程序運行狀態。
在函數式編程中,函數被認為是一種重要的元素,它們可以被賦予名稱(包括本地標識符),作為參數傳遞,并且可以像其他數據類型一樣從其他函數返回。這種特性使得程序可以以聲明性和可組合的方式編寫,通過模塊化地組合小功能。
篇幅有限,這里粘貼了一段百度百科的內容。函數通過保持不變性和沒有副作用,天然具備線程安全性,可以放心地在并發環境中使用。
public class WordCount4 {
int countDoc( Doc doc ) {
return doc.content.length();
}
void count() {
long total = Dir.docs.parallelStream().mapToInt(this::countDoc).sum();
System.out.println( "所有文檔字數總計: " + total + "個" );
}
public static void main(String[] args) {
new WordCount4().count();
}
}
執行結果:
5.消息傳遞
這是一個比較現實的工作場景,團隊成員需要協同工作。當某人需要另一個人處理事情時,他會發送一條消息給對方。收到消息后,另一個人會處理事情,并在完成后回復一條消息。
消息傳遞是一種通信方式,它意味著接收進程實際上獲得了一份獨立的數據副本,而發送方無法感知接收方對該副本的任何操作。唯一能向發送方回傳信息的方式是通過發送另一條消息。因此,不論收發雙方是在同一臺機器上還是被網絡隔離,它們都能以相同的方式進行通信。
消息傳遞一般可分為同步方式和異步方式。在同步方式下,發送方在消息抵達接收端之前無法進行其他操作;而在異步方式下,一旦消息被投遞,發送方就可以立即開始處理其他事務。(在現實世界中,機器之間的同步通信通常要求接收方給發送方發送一個確認消息,以告知一切正常,但這些細節對程序員來說可以是透明的。
java 自身沒有實現該模式,AKKA開源的庫實現了此模式。
嚴格說所有線程間通訊都應該用消息,我用個例子簡單表達一下原理:
public class Counter5 {
public Counter5() {
this.procMail();
}
private final BlockingQueue<Integer> box = new ArrayBlockingQueue<>(100);
private int c;
public void inc( int n ) {
try {
box.put(n);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private volatile boolean stop = false;
private void procMail() {
new Thread() {
@Override
public void run() {
while (!stop) {
try {
Integer n = box.poll(100 , TimeUnit.MILLISECONDS);
if ( n != null ) {
c += n;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}.start();
}
public int totalNumber() throws InterruptedException {
while ( ! box.isEmpty() ) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
this.stop = true;
return c;
}
}
public class DocProc5 implements Runnable {
private final Counter5 counter;
public final Doc doc;
public DocProc5(Counter5 counter, Doc doc) {
this.counter = counter;
this.doc = doc;
}
public void run() {
int n = countDoc( this.doc );
counter.inc(n);
}
int countDoc( Doc doc ) {
return doc.content.length();
}
}
public class WordCount5 {
private final Counter5 counter = new Counter5();
void count() throws InterruptedException {
List<Thread> threads = new ArrayList<>();
for ( Doc doc : Dir.docs ) {
DocProc5 docProc = new DocProc5(counter , doc);
Thread t = new Thread(docProc);
threads.add( t );
t.start();
}
for (Thread t : threads ) {
t.join();
}
System.out.println( "所有文檔字數總計: " + this.counter.totalNumber() + "個" );
}
public static void main(String[] args) throws InterruptedException {
new WordCount5().count();
}
}
執行結果