Disruptor是什么?
Disruptor是一個高性能的異步處理框架,一個輕量級的JMS,和JDK中的BlockingQueue有相似處,但是它的處理速度非常快,獲得2011年程序框架創新大獎,號稱“一個線程一秒鐘可以處理600W個訂單”(這有點嚇人吧),并且Disruptor不僅僅只有buffer,它提供的功能非常強大,比如它可以幫助我們輕松構建數據流處理(比如一個數據先交給A和B這2個消費者并行處理后再交給C處理,是不是有點想起storm這種流處理,實際上strom的底層就是應用了disruptor來實現worker內部threads的通信)。本文將使用disruptor最新版3.3.6進行介紹,可以在https://github.com/LMAX-Exchange/disruptor/releases 下載最新的JAR包開始disruptor之旅吧。
輪胎:RingBuffer
RingBuffer,環形緩沖區,在disruptor中扮演著非常重要的角色,理解RingBuffer的結構有利于我們理解disruptor為什么這么快、無鎖的實現方式、生產者/消費者模式的實現細節。如下圖所示:
數組
這個類似于輪胎的東西實際上就是一個數組,使用數組的好處當然是由于預加載的緣故使得訪問比鏈表要快的多。
序號
RingBuffer中元素擁有序號的概念,并且序號是一直增長的,這怎么理解?比如RingBuffer大小為10,那么序號從0開始增長,當到9的時候,相當于轉了一圈,如果繼續增長的話,那么將覆蓋0號元素。也即是說通過 序號%SIZE 來定位元素,實現set/get操作。這里也發現了和隊列不同的一個方式,就是不存在元素的刪除操作,只有覆蓋而已,實際上RingBuffer的這種特別的環形存儲方式,使得不需要花費大量的時間用于內存清理/垃圾回收。
由于涉及到取模操作,為了CPU進行位運算更加高效,RingBuffer的大小應該是2的N次方。
無鎖的機制
在生產者/消費者模式下,disruptor號稱“無鎖并行框架”(要知道BlockingQueue是利用了Lock鎖機制來實現的),這是怎么做到的呢?下面我們來具體分析下:
一個生產者 + 一個消費者
生產者維護一個生產指針P,消費者維護一個消費者指針C,當然P和C本質上就是序號。2者各操作各的,不需要鎖,僅僅需要注意的是生產者和消費者的速度問題,當然這個在disruptor內部已經為我們做了處理,就是判斷一下P和C之間不能超過一圈的大小。
一個生產者 + 多個消費者
多個消費者當然持有多個消費指針C1,C2,...,消費者依據C進行各自讀取數據,只需要保證生產者的速度“協調”最慢的消費者的速度,就是那個不能超出一圈的概念。此時也不需要進行鎖定。
多個生產者 + N個消費者
很顯然,無論生產者有幾個,生產者指針P只能存在一個,否則數據就亂套了。那么多個生產者之間共享一個P指針,在disruptor中實際上是利用了CAS機制來保證多線程的數據安全,也沒有使用到鎖。
Disruptor初體驗:簡單的生產者和消費者
業務數據對象POJO(Event)
public class Order {
//訂單ID
private long id;
//訂單信息
private String info;
//訂單價格
private double price;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getInfo() {
return info;
}
public void setInfo(String info) {
this.info = info;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
}
業務數據工廠(Factory)
public class OrderFactory implements EventFactory{
@Override
public Object newInstance() {
System.out.println("OrderFactory.newInstance");
return new Order();
}
}
事件處理器(Handler,即消費者處理邏輯)
public class OrderHandler implements EventHandler<Order>{
@Override
public void onEvent(Order order, long l, boolean b) throws Exception {
System.out.println(Thread.currentThread().getName() + " 消費者處理中:" + l);
order.setInfo("info" + order.getId());
order.setPrice(Math.random());
}
}
Main
public class Main {
public static void main(String[] args) throws InterruptedException {
//創建訂單工廠
OrderFactory orderFactory = new OrderFactory();
//ringbuffer的大小
int RINGBUFFER_SIZE = 1024;
//創建disruptor
Disruptor<Order> disruptor = new Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());
//設置事件處理器 即消費者
disruptor.handleEventsWith(new OrderHandler());
disruptor.start();
RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
//-------------生產數據
for(int i = 0 ; i < 3 ; i++){
long sequence = ringBuffer.next();
Order order = ringBuffer.get(sequence);
order.setId(i);
ringBuffer.publish(sequence);
System.out.println(Thread.currentThread().getName() + " 生產者發布一條數據:" + sequence + " 訂單ID:" + i);
}
Thread.sleep(1000);
disruptor.shutdown();
}
}
運行結果:
說明:
其實上面的結果已經很明顯的說明了,在初始階段構造Disruptor的時候,會調用工廠Factory去實例化RingBuffer中的Event數據對象。
另外在構造Disruptor的時候,在3.3.6之前使用的是API:
到了3.3.6這些API都不推薦使用了,即不再推薦傳入Executor這樣的線程池,而是推薦傳入ThreadFactory線程工廠。這樣的話,關閉disruptor就會自動關閉Executor線程池,而不需要像以前那樣必須在關閉disruptor的時候再關閉線程池了。
構造Disruptor時,需要注意ProducerType(SINGLE or MULTI 指示是單個生產者還是多個生產者模式)、WaitStrategy(策略選擇,決定了消費者如何等待生產者)。
單獨使用RingBuffer:WorkerPool
如果場景比較簡單,我們完全可以不用創建Disruptor,而是僅僅使用RingBuffer功能。
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);
RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.SINGLE,new OrderFactory(),1024,new YieldingWaitStrategy());
WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer,ringBuffer.newBarrier(),new IgnoreExceptionHandler(),new OrderHandler());
workerPool.start(executor);
//-------------生產數據
for(int i = 0 ; i < 30 ; i++){
long sequence = ringBuffer.next();
Order order = ringBuffer.get(sequence);
order.setId(i);
ringBuffer.publish(sequence);
System.out.println(Thread.currentThread().getName() + " 生產者發布一條數據:" + sequence + " 訂單ID:" + i);
}
Thread.sleep(1000);
workerPool.halt();
executor.shutdown();
}
實際上是利用WorkerPool輔助連接消費者。
一個生產者+多個消費者
public static void main(String[] args) throws InterruptedException {
//創建訂單工廠
OrderFactory orderFactory = new OrderFactory();
//ringbuffer的大小
int RINGBUFFER_SIZE = 1024;
//創建disruptor
Disruptor<Order> disruptor = new Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());
//設置事件處理器 即消費者
EventHandlerGroup<Order> eventHandlerGroup = disruptor.handleEventsWith(new OrderHandler(),new OrderHandler2());
eventHandlerGroup.then(new OrderHandler3());
disruptor.start();
RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
//-------------生產數據
for(int i = 0 ; i < 3 ; i++){
long sequence = ringBuffer.next();
Order order = ringBuffer.get(sequence);
order.setId(i);
ringBuffer.publish(sequence);
System.out.println(Thread.currentThread().getName() + " 生產者發布一條數據:" + sequence + " 訂單ID:" + i);
}
Thread.sleep(1000);
disruptor.shutdown();
}
運行結果:
生產者生產了3條消息,一個消費者線程1消費了這3條數據,另一個消費者線程2也消費了這3條數據,2者是并行的,待消費者線程1和2完畢后,3條數據交給消費者線程3處理。
如果我們想順序的按照A->B->C呢?
disruptor.handleEventsWith(new Handler1()).
handleEventsWith(new Handler2()).
handleEventsWith(new Handler3());
如果我們想六邊形操作呢?
Handler1 h1 = new Handler1();
Handler2 h2 = new Handler2();
Handler3 h3 = new Handler3();
Handler4 h4 = new Handler4();
Handler5 h5 = new Handler5();
disruptor.handleEventsWith(h1, h2);
disruptor.after(h1).handleEventsWith(h4);
disruptor.after(h2).handleEventsWith(h5);
disruptor.after(h4, h5).handleEventsWith(h3);
到這里相信你對Disruptor已經有所了解了,那么多個生產者多個消費者如何實現呢,其實和上面的代碼非常類似,無非是多個生產者都持有RingBuffer可以publish而已。