日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長提供免費收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

上文中咱們簡單提到了JDK9中Flow接口中的靜態(tài)內(nèi)部類實現(xiàn)了響應(yīng)式流的JAVA API,并且提供了一個一個Publisher的實現(xiàn)類SubmissionPublisher。本文將先梳理一下接口中具體的處理流程,然后再以幾個調(diào)用者的例子來幫助大家理解。

JDK9中的實現(xiàn)

再放上一下上文中的響應(yīng)式流的交互流程:

  1. 訂閱者向發(fā)布者發(fā)送訂閱請求。
  2. 發(fā)布者根據(jù)訂閱請求生成令牌發(fā)送給訂閱者。
  3. 訂閱者根據(jù)令牌向發(fā)布者發(fā)送請求N個數(shù)據(jù)。
  4. 發(fā)送者根據(jù)訂閱者的請求數(shù)量返回M(M<=N)個數(shù)據(jù)
  5. 重復(fù)3,4
  6. 數(shù)據(jù)發(fā)送完畢后由發(fā)布者發(fā)送給訂閱者結(jié)束信號

該流程的角度是以接口調(diào)用的交互來說的,而考慮實際的coding工作中,我們的調(diào)用流程其實為:

  1. 創(chuàng)建發(fā)布者
  2. 創(chuàng)建訂閱者
  3. 訂閱令牌交互
  4. 發(fā)送信息

接下來我們按照這個流程來梳理一下代碼細(xì)節(jié)。

創(chuàng)建發(fā)布者

對于實現(xiàn)響應(yīng)流的最開始的步驟,便是創(chuàng)建一個發(fā)布者。之前提到在JDK9中提供了一個發(fā)布者的簡單實現(xiàn)SubmissionPublisher。SubmissionPublisher繼承自Flow.Publisher,他有三種構(gòu)造函數(shù):

    public SubmissionPublisher() {
        this(ASYNC_POOL, Flow.defaultBufferSize(), null);
    }
    
    public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
        this(executor, maxBufferCapacity, null);
    }

    public SubmissionPublisher(Executor executor, int maxBufferCapacity,
                               BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler)

SubmissionPublisher將使用Executor作為“線程池”向訂閱者發(fā)送信息。如果需要需要設(shè)置線程池的話可以自己傳入,否則的話再無參的構(gòu)造函數(shù)中將默認(rèn)使用ForkJoinPool類的commonPool()方法獲取,即無餐構(gòu)造方法中的ASYNC_POOL靜態(tài)變量。

SubmissionPublisher會為每一個訂閱者單獨的建立一個緩沖空間,其大小由入?yún)axBufferCapacity決定。默認(rèn)情況下直接使用Flow.defaultBufferSize()來設(shè)置,默認(rèn)為256。如果緩沖區(qū)滿了之后會根據(jù)發(fā)送信息時候的策略確定是阻塞等待還是拋棄數(shù)據(jù)。

SubmissionPublisher會在訂閱者發(fā)生異常的時候(onNext處理中),會調(diào)用最后一個參數(shù)handler方法,然后才會取消訂閱。默認(rèn)的時候為null,也就是不會處理異常。

最簡單的創(chuàng)建SubmissionPublisher的方法就是直接使用無參構(gòu)造方法:

SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

上文書說到,因為SubmissionPublisher實現(xiàn)了AutoCloseable接口,所以可以用try來進行資源回收可以省略close()的調(diào)用:

try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){
}

但是也可以手動的調(diào)用close()方法來顯示的關(guān)閉發(fā)布者,關(guān)閉后再發(fā)送數(shù)據(jù)就會拋出異常:

if (complete)
    throw new IllegalStateException("Closed");

創(chuàng)建訂閱者

上文中咱們沒有手動創(chuàng)建訂閱者,而是直接調(diào)用SubmissionPublisher中的consume方法使用其內(nèi)部的訂閱者來消費消息。在本節(jié)可以實現(xiàn)接口Flow.Subscriber<T>創(chuàng)建一個SimpleSubscriber類:

public class SimpleSubscriber implements Flow.Subscriber<Integer> {
    private Flow.Subscription subscription;
    /**
     * 訂閱者名稱
     */
    private String name;
    /**
     * 定義最大消費數(shù)量
     */
    private final long maxCount;
    /**
     * 計數(shù)器
     */
    private long counter;
    public SimpleSubscriber(String name, long maxCount) {
        this.name = name;
        this.maxCount = maxCount <= 0 ? 1 : maxCount;
    }
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        System.out.printf("訂閱者:%s,最大消費數(shù)據(jù): %d。%n", name, maxCount);
        // 實際上是等于消費全部數(shù)據(jù)
        subscription.request(maxCount);
    }
    @Override
    public void onNext(Integer item) {
        counter++;
        System.out.printf("訂閱者:%s 接收到數(shù)據(jù):%d.%n", name, item);
        if (counter >= maxCount) {
            System.out.printf("準(zhǔn)備取消訂閱者: %s。已處理數(shù)據(jù)個數(shù):%d。%n", name, counter);
            // 處理完畢,取消訂閱
            subscription.cancel();
        }
    }
    @Override
    public void onError(Throwable t) {
        System.out.printf("訂閱者: %s,出現(xiàn)異常: %s。%n", name, t.getMessage());
    }
    @Override
    public void onComplete() {
        System.out.printf("訂閱者: %s 處理完成。%n", name);
    }
}

SimpleSubscriber是一個簡單訂閱者類,其邏輯是根據(jù)構(gòu)造參數(shù)可以定義其名稱name與最大處理數(shù)據(jù)值maxCount,最少處理一個數(shù)據(jù)。

當(dāng)發(fā)布者進行一個訂閱的時候會生成一個令牌Subscription作為參數(shù)調(diào)用onSubscribe方法。在訂閱者需要捕獲該令牌作為后續(xù)與發(fā)布者交互的紐帶。一般來說在onSubscribe中至少調(diào)用一次request且參數(shù)需要>0,否則發(fā)布者將無法向訂閱者發(fā)送任何信息,這也是為什么maxCount需要大于0。

當(dāng)發(fā)布者開始發(fā)送數(shù)據(jù)后,會異步的調(diào)用onNext方法并將數(shù)據(jù)傳入。該類中使用了一個計數(shù)器對數(shù)據(jù)數(shù)量進行校驗,當(dāng)達到最大值的時候,則會通過令牌(subscription)異步通知發(fā)布者訂閱結(jié)束,然后發(fā)送者再異步的調(diào)用發(fā)訂閱者的onComplete方法,以處理完成流程。

其中的onError和onComplete方法只進行打印,這里就不再說了。

以上的這個訂閱者可以看作是一個push模型的實現(xiàn),因為當(dāng)開始訂閱時訂閱者就約定了需要接受的數(shù)量,然后在后續(xù)的處理(onNext)中不再請求新數(shù)據(jù)。

我們可以用以下的代碼創(chuàng)建一個名稱為S1,消費2個元素的訂閱者:

SimpleSubscriber sub1 = new SimpleSubscriber("S1", 2);

訂閱令牌交互

當(dāng)我們可以創(chuàng)建了發(fā)送者和訂閱者之后,我們需要確認(rèn)一下進行交互的順序,由于響應(yīng)流的處理就是對于事件的處理,所以事件的順序十分重要,具體順序如下:

  1. 我們創(chuàng)建一個發(fā)布者publisher一個訂閱者subscriber
  2. 訂閱者subscriber通過調(diào)用發(fā)布者的subscribe()方法進行信息訂閱。如果訂閱成功,則發(fā)布者將生成一個令牌(Subscription)并作為入?yún)⒄{(diào)用訂閱者的訂閱事件方法onSubscribe()。如果調(diào)用異常則會直接調(diào)用訂閱者的onError錯誤處理方法,并拋出IllegalStateException異常然后結(jié)束訂閱。
  3. 在onSubscribe()中,訂閱者需要通過調(diào)用令牌(Subscription)的請求方法request(long)來異步的向發(fā)布者請求數(shù)據(jù)。
  4. 當(dāng)發(fā)布者有數(shù)據(jù)可以發(fā)布的時候,則會異步的調(diào)用訂閱者的onNext()方法,直到所有消息的總數(shù)已經(jīng)滿足了訂閱者調(diào)用request的數(shù)據(jù)請求上限。所以當(dāng)訂閱者請求訂閱的消息數(shù)為Long.MAX_VALUE時,實際上是消費所有數(shù)據(jù),即push模式。如果發(fā)布者沒有數(shù)據(jù)要發(fā)布了,則可以會調(diào)用發(fā)布者自己的close()方法并異步的調(diào)用所有訂閱者的onComplete()方法來通知訂閱結(jié)束。
  5. 發(fā)布者可以隨時向發(fā)布者請求更多的元素請求(一般在onNext里),而不用等到之前的處理完畢,一般是與之前的數(shù)據(jù)數(shù)量進行累加。
  6. 放發(fā)布者遇到異常的時候會調(diào)用訂閱者的onError()方法。

上面的描述中是只使用的一個訂閱者來進行描述的,后面的例子中將說明發(fā)布者可以擁有多個訂閱者(甚至0個訂閱者)。

發(fā)送信息

當(dāng)發(fā)布者需要推送消息的時候會調(diào)用submit方法或者offer方法,上文中我們提到submit實際上是offer的一種簡單實現(xiàn),本節(jié)咱們自己比較一下。

首先他們的方法簽名為:

int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
int offer(T item, BiPredicate<Flow.Subscriber <? super T>,? super T> onDrop)
int submit(T item)

而submit 和 offer的直接方法為:

    public int submit(T item) {
        return doOffer(item, Long.MAX_VALUE, null);
    }
    
    public int offer(T item,
                     BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
    return doOffer(item, 0L, onDrop);

可以看到他們的底層調(diào)用的都是 doOffer 方法,而doOffer的方法簽名為:

    private int doOffer(T item, long nanos,
                        BiPredicate<Subscriber<? super T>, ? super T> onDrop)

所以我們可以直接看doOffer()方法。doOffer()方法是可選阻塞時長的,而時長根據(jù)入?yún)?shù)nanos來決定。而onDrop()是一個刪除判斷器,如果調(diào)用BiPredicate的test()方法結(jié)果為true則會再次重試(根據(jù)令牌中的nextRetry屬性與發(fā)布器中的retryOffer()方法組合判斷,但是具體實現(xiàn)還沒梳理明白);如果結(jié)果為flase則直接刪除內(nèi)容。doOffer()返回的結(jié)果為正負(fù)兩種,正數(shù)的結(jié)果為發(fā)送了數(shù)據(jù),但是訂閱者還未消費的數(shù)據(jù)(估計值,因為是異步多線程的);如果為負(fù)數(shù),則返回的是重拾次數(shù)。

所以,根據(jù)submit()的參數(shù)我們可以發(fā)現(xiàn),submit會一直阻塞直到數(shù)據(jù)可以被消費(因為不會阻塞超時,所以不需要傳入onDrop()方法)。而我們可以根據(jù)需要配置offer()選擇器。如果必須要求數(shù)據(jù)都要被消費的話,那就可以直接選擇submit(),如果要設(shè)置重試次數(shù)的話就可以選擇使用offer()

異步調(diào)用的例子

下面看一個具體的程序例子,程序?qū)⒁?秒為周期進行數(shù)據(jù)發(fā)布:

public class PeriodicPublisher {

    public static final int WAIT_TIME = 2;
    public static final int SLEEP_TIME = 3;

    public static void main(String[] args) {
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
        // 創(chuàng)建4訂閱者
        SimpleSubscriber subscriber1 = new SimpleSubscriber("S1", 2);
        SimpleSubscriber subscriber2 = new SimpleSubscriber("S2", 4);
        SimpleSubscriber subscriber3 = new SimpleSubscriber("S3", 6);
        SimpleSubscriber subscriber4 = new SimpleSubscriber("S4", 10);
        // 前三個訂閱者直接進行訂閱
        publisher.subscribe(subscriber1);
        publisher.subscribe(subscriber2);
        publisher.subscribe(subscriber3);
        // 第四個方法延遲訂閱
        delaySubscribeWithWaitTime(publisher, subscriber4);
        // 開始發(fā)送消息
        Thread pubThread = publish(publisher, 5);
        try {
            // 等待處理完成
            pubThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static Thread publish(SubmissionPublisher<Integer> publisher, int count) {
        Thread t = new Thread(() -> {
            IntStream.range(1,count)
                    .forEach(item ->{
                        publisher.submit(item);
                        sleep(item);
                    });
            publisher.close();
        });
        t.start();
        return t;
    }
    
    
    private static void sleep(Integer item) {
        try {
            System.out.printf("推送數(shù)據(jù):%d。休眠 3 秒。%n", item);
            TimeUnit.SECONDS.sleep(SLEEP_TIME);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    private static void delaySubscribeWithWaitTime(SubmissionPublisher<Integer> publisher, Flow.Subscriber<Integer> sub) {
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(WAIT_TIME);
                publisher.subscribe(sub);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

}

代碼后是運行結(jié)果如下:

訂閱者:S1,最大消費數(shù)據(jù): 2。
推送數(shù)據(jù):1。休眠 3 秒。
訂閱者:S3,最大消費數(shù)據(jù): 6。
訂閱者:S2,最大消費數(shù)據(jù): 4。
訂閱者:S2 接收到數(shù)據(jù):1.
訂閱者:S3 接收到數(shù)據(jù):1.
訂閱者:S1 接收到數(shù)據(jù):1.
訂閱者:S4,最大消費數(shù)據(jù): 10。
推送數(shù)據(jù):2。休眠 3 秒。
訂閱者:S2 接收到數(shù)據(jù):2.
訂閱者:S3 接收到數(shù)據(jù):2.
訂閱者:S1 接收到數(shù)據(jù):2.
訂閱者:S4 接收到數(shù)據(jù):2.
準(zhǔn)備取消訂閱者: S1。已處理數(shù)據(jù)個數(shù):2。
推送數(shù)據(jù):3。休眠 3 秒。
訂閱者:S4 接收到數(shù)據(jù):3.
訂閱者:S2 接收到數(shù)據(jù):3.
訂閱者:S3 接收到數(shù)據(jù):3.
推送數(shù)據(jù):4。休眠 3 秒。
訂閱者:S4 接收到數(shù)據(jù):4.
訂閱者:S3 接收到數(shù)據(jù):4.
訂閱者:S2 接收到數(shù)據(jù):4.
準(zhǔn)備取消訂閱者: S2。已處理數(shù)據(jù)個數(shù):4。
推送數(shù)據(jù):5。休眠 3 秒。
訂閱者:S3 接收到數(shù)據(jù):5.
訂閱者:S4 接收到數(shù)據(jù):5.
訂閱者: S3 處理完成。
訂閱者: S4 處理完成。

由于是異步執(zhí)行,所以在“接收數(shù)據(jù)”部分的順序可能不同。

我們分析一下程序的執(zhí)行流程。

  • 創(chuàng)建一個發(fā)布者實例
  • 創(chuàng)建四個訂閱者實例S1、S2、S3、S4,可以接收數(shù)據(jù)的數(shù)量分別為:2、4、6、10。
  • 前三個訂閱者立即訂閱消息。
  • S4的訂閱者單獨創(chuàng)建一個線程等待WAIT_TIME秒(2秒)之后進行數(shù)據(jù)的訂閱。
  • 新建一個線程來以SLEEP_TIME秒(3秒)為間隔發(fā)布5個數(shù)據(jù)。
  • 將publish線程join()住等待流程結(jié)束。

執(zhí)行的日志滿足上述流程而針對一些關(guān)鍵點為:

  • S4在發(fā)送者推送數(shù)據(jù)"1"的時候還未訂閱,所以S4沒有接收到數(shù)據(jù)"1"。
  • 當(dāng)發(fā)送數(shù)據(jù)"2"的時候S1已經(jīng)接收夠了預(yù)期數(shù)據(jù)2個,所以取消了訂閱。之后只剩下S2、S3、S4。
  • 當(dāng)發(fā)送數(shù)據(jù)"4"的時候S2已經(jīng)接收夠了預(yù)期數(shù)據(jù)4個,所以取消了訂閱。之后只剩下S3、S4。
  • 當(dāng)發(fā)送數(shù)據(jù)"5"的時候只剩下S3、S4,當(dāng)發(fā)送完畢后publisher調(diào)用close()方法,通知S3、S4數(shù)據(jù)處理完成。

需要注意的是,如果在最后submit完畢之后直接close()然后結(jié)束進行的話可能訂閱者并不能執(zhí)行完畢。但是由于在任意一次submit()之后都有一次3秒的等待,所以本程序是可以執(zhí)行完畢的。

最后

本文中的例子是是簡單的實現(xiàn),可以通過調(diào)整訂閱者中的request的參數(shù),與在onNext中添加request調(diào)用來測試背壓的效果,還可以將submit調(diào)整為offer并添加onDrop方法以觀察拋棄信息時的流程。同時本文沒有提供Processor的例子,各位也可以自行學(xué)習(xí)。

總結(jié)一下流程: 訂閱者向發(fā)布者進行訂閱,然后發(fā)布者向訂閱者發(fā)送令牌。訂閱者使用令牌請求消息,發(fā)送者根據(jù)請求消息的數(shù)量推送消息。訂閱者可以隨時異步追加需要的更多信息。

JDK9中在Flow接口中實現(xiàn)了Java API的4個接口,并提供了SubmissionPublisher<T>作為Publisher<T>接口的簡單實現(xiàn)。

分享到:
標(biāo)簽:JDK
用戶無頭像

網(wǎng)友整理

注冊時間:

網(wǎng)站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨大挑戰(zhàn)2018-06-03

數(shù)獨一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運動步數(shù)有氧達人2018-06-03

記錄運動步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評定2018-06-03

通用課目體育訓(xùn)練成績評定