日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

Disruptor是什么?

Disruptor是一個高性能的異步處理框架,一個輕量級的JMS,和JDK中的BlockingQueue有相似處,但是它的處理速度非常快,獲得2011年程序框架創新大獎,號稱“一個線程一秒鐘可以處理600W個訂單”(這有點嚇人吧),并且Disruptor不僅僅只有buffer,它提供的功能非常強大,比如它可以幫助我們輕松構建數據流處理(比如一個數據先交給A和B這2個消費者并行處理后再交給C處理,是不是有點想起storm這種流處理,實際上strom的底層就是應用了disruptor來實現worker內部threads的通信)。本文將使用disruptor最新版3.3.6進行介紹,可以在https://github.com/LMAX-Exchange/disruptor/releases 下載最新的JAR包開始disruptor之旅吧。

輪胎:RingBuffer

RingBuffer,環形緩沖區,在disruptor中扮演著非常重要的角色,理解RingBuffer的結構有利于我們理解disruptor為什么這么快、無鎖的實現方式、生產者/消費者模式的實現細節。如下圖所示:

Java并發編程框架Disruptor

 

數組

這個類似于輪胎的東西實際上就是一個數組,使用數組的好處當然是由于預加載的緣故使得訪問比鏈表要快的多。

序號

RingBuffer中元素擁有序號的概念,并且序號是一直增長的,這怎么理解?比如RingBuffer大小為10,那么序號從0開始增長,當到9的時候,相當于轉了一圈,如果繼續增長的話,那么將覆蓋0號元素。也即是說通過 序號%SIZE 來定位元素,實現set/get操作。這里也發現了和隊列不同的一個方式,就是不存在元素的刪除操作,只有覆蓋而已,實際上RingBuffer的這種特別的環形存儲方式,使得不需要花費大量的時間用于內存清理/垃圾回收。

由于涉及到取模操作,為了CPU進行位運算更加高效,RingBuffer的大小應該是2的N次方。

無鎖的機制

在生產者/消費者模式下,disruptor號稱“無鎖并行框架”(要知道BlockingQueue是利用了Lock鎖機制來實現的),這是怎么做到的呢?下面我們來具體分析下:

一個生產者 + 一個消費者

生產者維護一個生產指針P,消費者維護一個消費者指針C,當然P和C本質上就是序號。2者各操作各的,不需要鎖,僅僅需要注意的是生產者和消費者的速度問題,當然這個在disruptor內部已經為我們做了處理,就是判斷一下P和C之間不能超過一圈的大小。

一個生產者 + 多個消費者

多個消費者當然持有多個消費指針C1,C2,...,消費者依據C進行各自讀取數據,只需要保證生產者的速度“協調”最慢的消費者的速度,就是那個不能超出一圈的概念。此時也不需要進行鎖定。

多個生產者 + N個消費者

很顯然,無論生產者有幾個,生產者指針P只能存在一個,否則數據就亂套了。那么多個生產者之間共享一個P指針,在disruptor中實際上是利用了CAS機制來保證多線程的數據安全,也沒有使用到鎖。

Disruptor初體驗:簡單的生產者和消費者

業務數據對象POJO(Event)

public class Order {

    //訂單ID
    private long id;

    //訂單信息
    private String info;

    //訂單價格
    private double price;

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getInfo() {
        return info;
    }

    public void setInfo(String info) {
        this.info = info;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }
}

業務數據工廠(Factory)

public class OrderFactory implements EventFactory{

    @Override
    public Object newInstance() {

        System.out.println("OrderFactory.newInstance");
        return new Order();
    }

}

事件處理器(Handler,即消費者處理邏輯)

public class OrderHandler implements EventHandler<Order>{

    @Override
    public void onEvent(Order order, long l, boolean b) throws Exception {

        System.out.println(Thread.currentThread().getName() + " 消費者處理中:" + l);
        order.setInfo("info" + order.getId());
        order.setPrice(Math.random());
    }

}

Main

public class Main {

    public static void main(String[] args) throws InterruptedException {

        //創建訂單工廠
        OrderFactory orderFactory = new OrderFactory();

        //ringbuffer的大小
        int RINGBUFFER_SIZE = 1024;

        //創建disruptor
        Disruptor<Order> disruptor = new Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());

        //設置事件處理器 即消費者
        disruptor.handleEventsWith(new OrderHandler());

        disruptor.start();

        RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();

        //-------------生產數據
        for(int i = 0 ; i < 3 ; i++){

            long sequence = ringBuffer.next();

            Order order = ringBuffer.get(sequence);

            order.setId(i);

            ringBuffer.publish(sequence);
            System.out.println(Thread.currentThread().getName() + " 生產者發布一條數據:" + sequence + " 訂單ID:" + i);
        }

        Thread.sleep(1000);

        disruptor.shutdown();
    }

}

運行結果:

Java并發編程框架Disruptor

 

說明:

其實上面的結果已經很明顯的說明了,在初始階段構造Disruptor的時候,會調用工廠Factory去實例化RingBuffer中的Event數據對象。

另外在構造Disruptor的時候,在3.3.6之前使用的是API:

Java并發編程框架Disruptor

 

到了3.3.6這些API都不推薦使用了,即不再推薦傳入Executor這樣的線程池,而是推薦傳入ThreadFactory線程工廠。這樣的話,關閉disruptor就會自動關閉Executor線程池,而不需要像以前那樣必須在關閉disruptor的時候再關閉線程池了。

構造Disruptor時,需要注意ProducerType(SINGLE or MULTI 指示是單個生產者還是多個生產者模式)、WaitStrategy(策略選擇,決定了消費者如何等待生產者)。

Java并發編程框架Disruptor

 

單獨使用RingBuffer:WorkerPool

如果場景比較簡單,我們完全可以不用創建Disruptor,而是僅僅使用RingBuffer功能。

public static void main(String[] args) throws InterruptedException {

    ExecutorService executor = Executors.newFixedThreadPool(3);
    RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.SINGLE,new OrderFactory(),1024,new YieldingWaitStrategy());
    WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer,ringBuffer.newBarrier(),new IgnoreExceptionHandler(),new OrderHandler());

    workerPool.start(executor);

    //-------------生產數據
    for(int i = 0 ; i < 30 ; i++){

        long sequence = ringBuffer.next();

        Order order = ringBuffer.get(sequence);
        order.setId(i);

        ringBuffer.publish(sequence);

        System.out.println(Thread.currentThread().getName() + " 生產者發布一條數據:" + sequence + " 訂單ID:" + i);
    }

    Thread.sleep(1000);
    
    workerPool.halt();
    executor.shutdown();

}

實際上是利用WorkerPool輔助連接消費者。

一個生產者+多個消費者

Java并發編程框架Disruptor

 

public static void main(String[] args) throws InterruptedException {

    //創建訂單工廠
    OrderFactory orderFactory = new OrderFactory();

    //ringbuffer的大小
    int RINGBUFFER_SIZE = 1024;

    //創建disruptor
    Disruptor<Order> disruptor = new Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());

    //設置事件處理器 即消費者
    EventHandlerGroup<Order> eventHandlerGroup = disruptor.handleEventsWith(new OrderHandler(),new OrderHandler2());
    eventHandlerGroup.then(new OrderHandler3());
    disruptor.start();

    RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();

    //-------------生產數據
    for(int i = 0 ; i < 3 ; i++){

        long sequence = ringBuffer.next();

        Order order = ringBuffer.get(sequence);

        order.setId(i);

        ringBuffer.publish(sequence);
        System.out.println(Thread.currentThread().getName() + " 生產者發布一條數據:" + sequence + " 訂單ID:" + i);
    }

    Thread.sleep(1000);
    disruptor.shutdown();
}

運行結果:

Java并發編程框架Disruptor

 

生產者生產了3條消息,一個消費者線程1消費了這3條數據,另一個消費者線程2也消費了這3條數據,2者是并行的,待消費者線程1和2完畢后,3條數據交給消費者線程3處理。

如果我們想順序的按照A->B->C呢?

        disruptor.handleEventsWith(new Handler1()).
        	handleEventsWith(new Handler2()).
        	handleEventsWith(new Handler3());

如果我們想六邊形操作呢?

Java并發編程框架Disruptor

 

        Handler1 h1 = new Handler1();
        Handler2 h2 = new Handler2();
        Handler3 h3 = new Handler3();
        Handler4 h4 = new Handler4();
        Handler5 h5 = new Handler5();
        disruptor.handleEventsWith(h1, h2);
        disruptor.after(h1).handleEventsWith(h4);
        disruptor.after(h2).handleEventsWith(h5);
        disruptor.after(h4, h5).handleEventsWith(h3);

到這里相信你對Disruptor已經有所了解了,那么多個生產者多個消費者如何實現呢,其實和上面的代碼非常類似,無非是多個生產者都持有RingBuffer可以publish而已。

分享到:
標簽:Disruptor
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定