Disruptor是一個(gè)開源框架,研發(fā)的初衷是為了解決高并發(fā)下隊(duì)列鎖的問題,最早由LMAX提出并使用,能夠在無鎖的情況下實(shí)現(xiàn)隊(duì)列的并發(fā)操作,并號(hào)稱能夠在一個(gè)線程里每秒處理6百萬筆訂單
官網(wǎng):lmax-exchange.github.io/disruptor/
目前,包括Apache Storm、Camel、Log4j2在內(nèi)的很多知名項(xiàng)目都應(yīng)用了Disruptor以獲取高性能
為什么會(huì)產(chǎn)生Disruptor框架
「目前JAVA內(nèi)置隊(duì)列保證線程安全的方式:」
ArrayBlockingQueue:基于數(shù)組形式的隊(duì)列,通過加鎖的方式,來保證多線程情況下數(shù)據(jù)的安全;
LinkedBlockingQueue:基于鏈表形式的隊(duì)列,也通過加鎖的方式,來保證多線程情況下數(shù)據(jù)的安全;
ConcurrentLinkedQueue:基于鏈表形式的隊(duì)列,通過CAS的方式
我們知道,在編程過程中,加鎖通常會(huì)嚴(yán)重地影響性能,所以盡量用無鎖方式,就產(chǎn)生了Disruptor這種無鎖高并發(fā)框架
基本概念
參考地址:github.com/LMAX-Exchan…
RingBuffer——Disruptor底層數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn),核心類,是線程間交換數(shù)據(jù)的中轉(zhuǎn)地;
Sequencer——序號(hào)管理器,生產(chǎn)同步的實(shí)現(xiàn)者,負(fù)責(zé)消費(fèi)者/生產(chǎn)者各自序號(hào)、序號(hào)柵欄的管理和協(xié)調(diào),Sequencer有單生產(chǎn)者,多生產(chǎn)者兩種不同的模式,里面實(shí)現(xiàn)了各種同步的算法;
Sequence——序號(hào),聲明一個(gè)序號(hào),用于跟蹤ringbuffer中任務(wù)的變化和消費(fèi)者的消費(fèi)情況,disruptor里面大部分的并發(fā)代碼都是通過對Sequence的值同步修改實(shí)現(xiàn)的,而非鎖,這是disruptor高性能的一個(gè)主要原因;
SequenceBarrier——序號(hào)柵欄,管理和協(xié)調(diào)生產(chǎn)者的游標(biāo)序號(hào)和各個(gè)消費(fèi)者的序號(hào),確保生產(chǎn)者不會(huì)覆蓋消費(fèi)者未來得及處理的消息,確保存在依賴的消費(fèi)者之間能夠按照正確的順序處理
EventProcessor——事件處理器,監(jiān)聽RingBuffer的事件,并消費(fèi)可用事件,從RingBuffer讀取的事件會(huì)交由實(shí)際的生產(chǎn)者實(shí)現(xiàn)類來消費(fèi);它會(huì)一直偵聽下一個(gè)可用的序號(hào),直到該序號(hào)對應(yīng)的事件已經(jīng)準(zhǔn)備好。
EventHandler——業(yè)務(wù)處理器,是實(shí)際消費(fèi)者的接口,完成具體的業(yè)務(wù)邏輯實(shí)現(xiàn),第三方實(shí)現(xiàn)該接口;代表著消費(fèi)者。
Producer——生產(chǎn)者接口,第三方線程充當(dāng)該角色,producer向RingBuffer寫入事件。
Wait Strategy:Wait Strategy決定了一個(gè)消費(fèi)者怎么等待生產(chǎn)者將事件(Event)放入Disruptor中。
等待策略
源碼地址:github.com/LMAX-Exchan…
「BlockingWaitStrategy」
Disruptor的默認(rèn)策略是BlockingWaitStrategy。在BlockingWaitStrategy內(nèi)部是使用鎖和condition來控制線程的喚醒。BlockingWaitStrategy是最低效的策略,但其對CPU的消耗最小并且在各種不同部署環(huán)境中能提供更加一致的性能表現(xiàn)。
「SleepingWaitStrategy」
SleepingWaitStrategy 的性能表現(xiàn)跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生產(chǎn)者線程的影響最小,通過使用LockSupport.parkNanos(1)來實(shí)現(xiàn)循環(huán)等待。
「YieldingWaitStrategy」
YieldingWaitStrategy是可以使用在低延遲系統(tǒng)的策略之一。YieldingWaitStrategy將自旋以等待序列增加到適當(dāng)?shù)闹怠T谘h(huán)體內(nèi),將調(diào)用Thread.yield()以允許其他排隊(duì)的線程運(yùn)行。在要求極高性能且事件處理線數(shù)小于 CPU 邏輯核心數(shù)的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。
「BusySpinWaitStrategy」
性能最好,適合用于低延遲的系統(tǒng)。在要求極高性能且事件處理線程數(shù)小于CPU邏輯核心數(shù)的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。
「PhasedBackoffWaitStrategy」
自旋 + yield + 自定義策略,CPU資源緊缺,吞吐量和延遲并不重要的場景。
使用舉例
參考地址:github.com/LMAX-Exchan…
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
//定義事件event 通過Disruptor 進(jìn)行交換的數(shù)據(jù)類型。
public class LongEvent { private Long value; public Long getValue() {
return value;
} public void setValue(Long value) { this.value = value; }}
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}}
//定義事件消費(fèi)者
public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("消費(fèi)者:"+event.getValue());
}}
//定義生產(chǎn)者
public class LongEventProducer {
public final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
// 1.ringBuffer 事件隊(duì)列 下一個(gè)槽
long sequence = ringBuffer.next();
Long data = null;
try {
//2.取出空的事件隊(duì)列
LongEvent longEvent = ringBuffer.get(sequence);
data = byteBuffer.getLong(0);
//3.獲取事件隊(duì)列傳遞的數(shù)據(jù)
longEvent.setValue(data);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} finally {
System.out.println("生產(chǎn)這準(zhǔn)備發(fā)送數(shù)據(jù)");
//4.發(fā)布事件
ringBuffer.publish(sequence);
}
}
}
public class DisruptorMain {
public static void main(String[] args) {
// 1.創(chuàng)建一個(gè)可緩存的線程 提供線程來出發(fā)Consumer 的事件處理
ExecutorService executor = Executors.newCachedThreadPool();
// 2.創(chuàng)建工廠
EventFactory<LongEvent> eventFactory = new LongEventFactory();
// 3.創(chuàng)建ringBuffer 大小
int ringBufferSize = 1024 * 1024; // ringBufferSize大小一定要是2的N次方
// 4.創(chuàng)建Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor,
ProducerType.SINGLE, new YieldingWaitStrategy());
// 5.連接消費(fèi)端方法
disruptor.handleEventsWith(new LongEventHandler());
// 6.啟動(dòng)
disruptor.start();
// 7.創(chuàng)建RingBuffer容器
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 8.創(chuàng)建生產(chǎn)者
LongEventProducer producer = new LongEventProducer(ringBuffer);
// 9.指定緩沖區(qū)大小
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (int i = 1; i <= 100; i++) {
byteBuffer.putLong(0, i);
producer.onData(byteBuffer);
}
//10.關(guān)閉disruptor和executor
disruptor.shutdown();
executor.shutdown();
}
}
核心設(shè)計(jì)原理
Disruptor通過以下設(shè)計(jì)來解決隊(duì)列速度慢的問題:
「環(huán)形數(shù)組結(jié)構(gòu):」
為了避免垃圾回收,采用數(shù)組而非鏈表。同時(shí),數(shù)組對處理器的緩存機(jī)制更加友好
?
原因:CPU緩存是由很多個(gè)緩存行組成的。每個(gè)緩存行通常是64字節(jié),并且它有效地引用主內(nèi)存中的一塊兒地址。一個(gè)Java的long類型變量是8字節(jié),因此在一個(gè)緩存行中可以存8個(gè)long類型的變量。CPU每次從主存中拉取數(shù)據(jù)時(shí),會(huì)把相鄰的數(shù)據(jù)也存入同一個(gè)緩存行。在訪問一個(gè)long數(shù)組的時(shí)候,如果數(shù)組中的一個(gè)值被加載到緩存中,它會(huì)自動(dòng)加載另外7個(gè)。因此你能非常快的遍歷這個(gè)數(shù)組。
?
「元素位置定位:」
數(shù)組長度2^n,通過位運(yùn)算,加快定位的速度。下標(biāo)采取遞增的形式。不用擔(dān)心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。
「無鎖設(shè)計(jì):」
每個(gè)生產(chǎn)者或者消費(fèi)者線程,會(huì)先申請可以操作的元素在數(shù)組中的位置,申請到之后,直接在該位置寫入或者讀取數(shù)據(jù),整個(gè)過程通過原子變量CAS,保證操作的線程安全
數(shù)據(jù)結(jié)構(gòu)
框架使用RingBuffer來作為隊(duì)列的數(shù)據(jù)結(jié)構(gòu),RingBuffer就是一個(gè)可自定義大小的環(huán)形數(shù)組。
除數(shù)組外還有一個(gè)序列號(hào)(sequence),用以指向下一個(gè)可用的元素,供生產(chǎn)者與消費(fèi)者使用。
原理圖如下所示:
Sequence
mark:Disruptor通過順序遞增的序號(hào)來編號(hào)管理通過其進(jìn)行交換的數(shù)據(jù)(事件),對數(shù)據(jù)(事件)的處理過程總是沿著序號(hào)逐個(gè)遞增處理。
「數(shù)組+序列號(hào)設(shè)計(jì)的優(yōu)勢是什么呢?」
回顧一下HashMap,在知道索引(index)下標(biāo)的情況下,存與取數(shù)組上的元素時(shí)間復(fù)雜度只有O(1),而這個(gè)index我們可以通過序列號(hào)與數(shù)組的長度取模來計(jì)算得出,index=sequence % table.length。當(dāng)然也可以用位運(yùn)算來計(jì)算效率更高,此時(shí)table.length必須是2的冪次方。
寫數(shù)據(jù)流程
單線程寫數(shù)據(jù)的流程:
- 申請寫入m個(gè)元素;
- 若是有m個(gè)元素可以入,則返回最大的序列號(hào)。這兒主要判斷是否會(huì)覆蓋未讀的元素;
- 若是返回的正確,則生產(chǎn)者開始寫入元素。
使用場景
經(jīng)過測試,Disruptor的的延時(shí)和吞吐量都比ArrayBlockingQueue優(yōu)秀很多,所以,當(dāng)你在使用ArrayBlockingQueue出現(xiàn)性能瓶頸的時(shí)候,你就可以考慮采用Disruptor的代替。
參考:github.com/LMAX-Exchan…
當(dāng)然,Disruptor性能高并不是必然的,所以,是否使用還得經(jīng)過測試。
Disruptor的最常用的場景就是“生產(chǎn)者-消費(fèi)者”場景,對場景的就是“一個(gè)生產(chǎn)者、多個(gè)消費(fèi)者”的場景,并且要求順序處理。
舉個(gè)例子,我們從MySQL的BigLog文件中順序讀取數(shù)據(jù),然后寫入到ElasticSearch(搜索引擎)中。在這種場景下,BigLog要求一個(gè)文件一個(gè)生產(chǎn)者,那個(gè)是一個(gè)生產(chǎn)者。而寫入到ElasticSearch,則嚴(yán)格要求順序,否則會(huì)出現(xiàn)問題,所以通常意義上的多消費(fèi)者線程無法解決該問題,如果通過加鎖,則性能大打折扣
作者:月伴飛魚
鏈接:https://juejin.im/post/6869795029800452103
來源:掘金