上文中咱們簡單提到了JDK9中Flow接口中的靜態(tài)內(nèi)部類實現(xiàn)了響應(yīng)式流的JAVA API,并且提供了一個一個Publisher的實現(xiàn)類SubmissionPublisher。本文將先梳理一下接口中具體的處理流程,然后再以幾個調(diào)用者的例子來幫助大家理解。
JDK9中的實現(xiàn)
再放上一下上文中的響應(yīng)式流的交互流程:
- 訂閱者向發(fā)布者發(fā)送訂閱請求。
- 發(fā)布者根據(jù)訂閱請求生成令牌發(fā)送給訂閱者。
- 訂閱者根據(jù)令牌向發(fā)布者發(fā)送請求N個數(shù)據(jù)。
- 發(fā)送者根據(jù)訂閱者的請求數(shù)量返回M(M<=N)個數(shù)據(jù)
- 重復(fù)3,4
- 數(shù)據(jù)發(fā)送完畢后由發(fā)布者發(fā)送給訂閱者結(jié)束信號
該流程的角度是以接口調(diào)用的交互來說的,而考慮實際的coding工作中,我們的調(diào)用流程其實為:
- 創(chuàng)建發(fā)布者
- 創(chuàng)建訂閱者
- 訂閱令牌交互
- 發(fā)送信息
接下來我們按照這個流程來梳理一下代碼細(xì)節(jié)。
創(chuàng)建發(fā)布者
對于實現(xiàn)響應(yīng)流的最開始的步驟,便是創(chuàng)建一個發(fā)布者。之前提到在JDK9中提供了一個發(fā)布者的簡單實現(xiàn)SubmissionPublisher。SubmissionPublisher繼承自Flow.Publisher,他有三種構(gòu)造函數(shù):
public SubmissionPublisher() {
this(ASYNC_POOL, Flow.defaultBufferSize(), null);
}
public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
this(executor, maxBufferCapacity, null);
}
public SubmissionPublisher(Executor executor, int maxBufferCapacity,
BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler)
SubmissionPublisher將使用Executor作為“線程池”向訂閱者發(fā)送信息。如果需要需要設(shè)置線程池的話可以自己傳入,否則的話再無參的構(gòu)造函數(shù)中將默認(rèn)使用ForkJoinPool類的commonPool()方法獲取,即無餐構(gòu)造方法中的ASYNC_POOL靜態(tài)變量。
SubmissionPublisher會為每一個訂閱者單獨的建立一個緩沖空間,其大小由入?yún)axBufferCapacity決定。默認(rèn)情況下直接使用Flow.defaultBufferSize()來設(shè)置,默認(rèn)為256。如果緩沖區(qū)滿了之后會根據(jù)發(fā)送信息時候的策略確定是阻塞等待還是拋棄數(shù)據(jù)。
SubmissionPublisher會在訂閱者發(fā)生異常的時候(onNext處理中),會調(diào)用最后一個參數(shù)handler方法,然后才會取消訂閱。默認(rèn)的時候為null,也就是不會處理異常。
最簡單的創(chuàng)建SubmissionPublisher的方法就是直接使用無參構(gòu)造方法:
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
上文書說到,因為SubmissionPublisher實現(xiàn)了AutoCloseable接口,所以可以用try來進行資源回收可以省略close()的調(diào)用:
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){
}
但是也可以手動的調(diào)用close()方法來顯示的關(guān)閉發(fā)布者,關(guān)閉后再發(fā)送數(shù)據(jù)就會拋出異常:
if (complete)
throw new IllegalStateException("Closed");
創(chuàng)建訂閱者
上文中咱們沒有手動創(chuàng)建訂閱者,而是直接調(diào)用SubmissionPublisher中的consume方法使用其內(nèi)部的訂閱者來消費消息。在本節(jié)可以實現(xiàn)接口Flow.Subscriber<T>創(chuàng)建一個SimpleSubscriber類:
public class SimpleSubscriber implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription;
/**
* 訂閱者名稱
*/
private String name;
/**
* 定義最大消費數(shù)量
*/
private final long maxCount;
/**
* 計數(shù)器
*/
private long counter;
public SimpleSubscriber(String name, long maxCount) {
this.name = name;
this.maxCount = maxCount <= 0 ? 1 : maxCount;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.printf("訂閱者:%s,最大消費數(shù)據(jù): %d。%n", name, maxCount);
// 實際上是等于消費全部數(shù)據(jù)
subscription.request(maxCount);
}
@Override
public void onNext(Integer item) {
counter++;
System.out.printf("訂閱者:%s 接收到數(shù)據(jù):%d.%n", name, item);
if (counter >= maxCount) {
System.out.printf("準(zhǔn)備取消訂閱者: %s。已處理數(shù)據(jù)個數(shù):%d。%n", name, counter);
// 處理完畢,取消訂閱
subscription.cancel();
}
}
@Override
public void onError(Throwable t) {
System.out.printf("訂閱者: %s,出現(xiàn)異常: %s。%n", name, t.getMessage());
}
@Override
public void onComplete() {
System.out.printf("訂閱者: %s 處理完成。%n", name);
}
}
SimpleSubscriber是一個簡單訂閱者類,其邏輯是根據(jù)構(gòu)造參數(shù)可以定義其名稱name與最大處理數(shù)據(jù)值maxCount,最少處理一個數(shù)據(jù)。
當(dāng)發(fā)布者進行一個訂閱的時候會生成一個令牌Subscription作為參數(shù)調(diào)用onSubscribe方法。在訂閱者需要捕獲該令牌作為后續(xù)與發(fā)布者交互的紐帶。一般來說在onSubscribe中至少調(diào)用一次request且參數(shù)需要>0,否則發(fā)布者將無法向訂閱者發(fā)送任何信息,這也是為什么maxCount需要大于0。
當(dāng)發(fā)布者開始發(fā)送數(shù)據(jù)后,會異步的調(diào)用onNext方法并將數(shù)據(jù)傳入。該類中使用了一個計數(shù)器對數(shù)據(jù)數(shù)量進行校驗,當(dāng)達到最大值的時候,則會通過令牌(subscription)異步通知發(fā)布者訂閱結(jié)束,然后發(fā)送者再異步的調(diào)用發(fā)訂閱者的onComplete方法,以處理完成流程。
其中的onError和onComplete方法只進行打印,這里就不再說了。
以上的這個訂閱者可以看作是一個push模型的實現(xiàn),因為當(dāng)開始訂閱時訂閱者就約定了需要接受的數(shù)量,然后在后續(xù)的處理(onNext)中不再請求新數(shù)據(jù)。
我們可以用以下的代碼創(chuàng)建一個名稱為S1,消費2個元素的訂閱者:
SimpleSubscriber sub1 = new SimpleSubscriber("S1", 2);
訂閱令牌交互
當(dāng)我們可以創(chuàng)建了發(fā)送者和訂閱者之后,我們需要確認(rèn)一下進行交互的順序,由于響應(yīng)流的處理就是對于事件的處理,所以事件的順序十分重要,具體順序如下:
- 我們創(chuàng)建一個發(fā)布者publisher一個訂閱者subscriber
- 訂閱者subscriber通過調(diào)用發(fā)布者的subscribe()方法進行信息訂閱。如果訂閱成功,則發(fā)布者將生成一個令牌(Subscription)并作為入?yún)⒄{(diào)用訂閱者的訂閱事件方法onSubscribe()。如果調(diào)用異常則會直接調(diào)用訂閱者的onError錯誤處理方法,并拋出IllegalStateException異常然后結(jié)束訂閱。
- 在onSubscribe()中,訂閱者需要通過調(diào)用令牌(Subscription)的請求方法request(long)來異步的向發(fā)布者請求數(shù)據(jù)。
- 當(dāng)發(fā)布者有數(shù)據(jù)可以發(fā)布的時候,則會異步的調(diào)用訂閱者的onNext()方法,直到所有消息的總數(shù)已經(jīng)滿足了訂閱者調(diào)用request的數(shù)據(jù)請求上限。所以當(dāng)訂閱者請求訂閱的消息數(shù)為Long.MAX_VALUE時,實際上是消費所有數(shù)據(jù),即push模式。如果發(fā)布者沒有數(shù)據(jù)要發(fā)布了,則可以會調(diào)用發(fā)布者自己的close()方法并異步的調(diào)用所有訂閱者的onComplete()方法來通知訂閱結(jié)束。
- 發(fā)布者可以隨時向發(fā)布者請求更多的元素請求(一般在onNext里),而不用等到之前的處理完畢,一般是與之前的數(shù)據(jù)數(shù)量進行累加。
- 放發(fā)布者遇到異常的時候會調(diào)用訂閱者的onError()方法。
上面的描述中是只使用的一個訂閱者來進行描述的,后面的例子中將說明發(fā)布者可以擁有多個訂閱者(甚至0個訂閱者)。
發(fā)送信息
當(dāng)發(fā)布者需要推送消息的時候會調(diào)用submit方法或者offer方法,上文中我們提到submit實際上是offer的一種簡單實現(xiàn),本節(jié)咱們自己比較一下。
首先他們的方法簽名為:
int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
int offer(T item, BiPredicate<Flow.Subscriber <? super T>,? super T> onDrop)
int submit(T item)
而submit 和 offer的直接方法為:
public int submit(T item) {
return doOffer(item, Long.MAX_VALUE, null);
}
public int offer(T item,
BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
return doOffer(item, 0L, onDrop);
可以看到他們的底層調(diào)用的都是 doOffer 方法,而doOffer的方法簽名為:
private int doOffer(T item, long nanos,
BiPredicate<Subscriber<? super T>, ? super T> onDrop)
所以我們可以直接看doOffer()方法。doOffer()方法是可選阻塞時長的,而時長根據(jù)入?yún)?shù)nanos來決定。而onDrop()是一個刪除判斷器,如果調(diào)用BiPredicate的test()方法結(jié)果為true則會再次重試(根據(jù)令牌中的nextRetry屬性與發(fā)布器中的retryOffer()方法組合判斷,但是具體實現(xiàn)還沒梳理明白);如果結(jié)果為flase則直接刪除內(nèi)容。doOffer()返回的結(jié)果為正負(fù)兩種,正數(shù)的結(jié)果為發(fā)送了數(shù)據(jù),但是訂閱者還未消費的數(shù)據(jù)(估計值,因為是異步多線程的);如果為負(fù)數(shù),則返回的是重拾次數(shù)。
所以,根據(jù)submit()的參數(shù)我們可以發(fā)現(xiàn),submit會一直阻塞直到數(shù)據(jù)可以被消費(因為不會阻塞超時,所以不需要傳入onDrop()方法)。而我們可以根據(jù)需要配置offer()選擇器。如果必須要求數(shù)據(jù)都要被消費的話,那就可以直接選擇submit(),如果要設(shè)置重試次數(shù)的話就可以選擇使用offer()
異步調(diào)用的例子
下面看一個具體的程序例子,程序?qū)⒁?秒為周期進行數(shù)據(jù)發(fā)布:
public class PeriodicPublisher {
public static final int WAIT_TIME = 2;
public static final int SLEEP_TIME = 3;
public static void main(String[] args) {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
// 創(chuàng)建4訂閱者
SimpleSubscriber subscriber1 = new SimpleSubscriber("S1", 2);
SimpleSubscriber subscriber2 = new SimpleSubscriber("S2", 4);
SimpleSubscriber subscriber3 = new SimpleSubscriber("S3", 6);
SimpleSubscriber subscriber4 = new SimpleSubscriber("S4", 10);
// 前三個訂閱者直接進行訂閱
publisher.subscribe(subscriber1);
publisher.subscribe(subscriber2);
publisher.subscribe(subscriber3);
// 第四個方法延遲訂閱
delaySubscribeWithWaitTime(publisher, subscriber4);
// 開始發(fā)送消息
Thread pubThread = publish(publisher, 5);
try {
// 等待處理完成
pubThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static Thread publish(SubmissionPublisher<Integer> publisher, int count) {
Thread t = new Thread(() -> {
IntStream.range(1,count)
.forEach(item ->{
publisher.submit(item);
sleep(item);
});
publisher.close();
});
t.start();
return t;
}
private static void sleep(Integer item) {
try {
System.out.printf("推送數(shù)據(jù):%d。休眠 3 秒。%n", item);
TimeUnit.SECONDS.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void delaySubscribeWithWaitTime(SubmissionPublisher<Integer> publisher, Flow.Subscriber<Integer> sub) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(WAIT_TIME);
publisher.subscribe(sub);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
代碼后是運行結(jié)果如下:
訂閱者:S1,最大消費數(shù)據(jù): 2。
推送數(shù)據(jù):1。休眠 3 秒。
訂閱者:S3,最大消費數(shù)據(jù): 6。
訂閱者:S2,最大消費數(shù)據(jù): 4。
訂閱者:S2 接收到數(shù)據(jù):1.
訂閱者:S3 接收到數(shù)據(jù):1.
訂閱者:S1 接收到數(shù)據(jù):1.
訂閱者:S4,最大消費數(shù)據(jù): 10。
推送數(shù)據(jù):2。休眠 3 秒。
訂閱者:S2 接收到數(shù)據(jù):2.
訂閱者:S3 接收到數(shù)據(jù):2.
訂閱者:S1 接收到數(shù)據(jù):2.
訂閱者:S4 接收到數(shù)據(jù):2.
準(zhǔn)備取消訂閱者: S1。已處理數(shù)據(jù)個數(shù):2。
推送數(shù)據(jù):3。休眠 3 秒。
訂閱者:S4 接收到數(shù)據(jù):3.
訂閱者:S2 接收到數(shù)據(jù):3.
訂閱者:S3 接收到數(shù)據(jù):3.
推送數(shù)據(jù):4。休眠 3 秒。
訂閱者:S4 接收到數(shù)據(jù):4.
訂閱者:S3 接收到數(shù)據(jù):4.
訂閱者:S2 接收到數(shù)據(jù):4.
準(zhǔn)備取消訂閱者: S2。已處理數(shù)據(jù)個數(shù):4。
推送數(shù)據(jù):5。休眠 3 秒。
訂閱者:S3 接收到數(shù)據(jù):5.
訂閱者:S4 接收到數(shù)據(jù):5.
訂閱者: S3 處理完成。
訂閱者: S4 處理完成。
由于是異步執(zhí)行,所以在“接收數(shù)據(jù)”部分的順序可能不同。
我們分析一下程序的執(zhí)行流程。
- 創(chuàng)建一個發(fā)布者實例
- 創(chuàng)建四個訂閱者實例S1、S2、S3、S4,可以接收數(shù)據(jù)的數(shù)量分別為:2、4、6、10。
- 前三個訂閱者立即訂閱消息。
- S4的訂閱者單獨創(chuàng)建一個線程等待WAIT_TIME秒(2秒)之后進行數(shù)據(jù)的訂閱。
- 新建一個線程來以SLEEP_TIME秒(3秒)為間隔發(fā)布5個數(shù)據(jù)。
- 將publish線程join()住等待流程結(jié)束。
執(zhí)行的日志滿足上述流程而針對一些關(guān)鍵點為:
- S4在發(fā)送者推送數(shù)據(jù)"1"的時候還未訂閱,所以S4沒有接收到數(shù)據(jù)"1"。
- 當(dāng)發(fā)送數(shù)據(jù)"2"的時候S1已經(jīng)接收夠了預(yù)期數(shù)據(jù)2個,所以取消了訂閱。之后只剩下S2、S3、S4。
- 當(dāng)發(fā)送數(shù)據(jù)"4"的時候S2已經(jīng)接收夠了預(yù)期數(shù)據(jù)4個,所以取消了訂閱。之后只剩下S3、S4。
- 當(dāng)發(fā)送數(shù)據(jù)"5"的時候只剩下S3、S4,當(dāng)發(fā)送完畢后publisher調(diào)用close()方法,通知S3、S4數(shù)據(jù)處理完成。
需要注意的是,如果在最后submit完畢之后直接close()然后結(jié)束進行的話可能訂閱者并不能執(zhí)行完畢。但是由于在任意一次submit()之后都有一次3秒的等待,所以本程序是可以執(zhí)行完畢的。
最后
本文中的例子是是簡單的實現(xiàn),可以通過調(diào)整訂閱者中的request的參數(shù),與在onNext中添加request調(diào)用來測試背壓的效果,還可以將submit調(diào)整為offer并添加onDrop方法以觀察拋棄信息時的流程。同時本文沒有提供Processor的例子,各位也可以自行學(xué)習(xí)。
總結(jié)一下流程: 訂閱者向發(fā)布者進行訂閱,然后發(fā)布者向訂閱者發(fā)送令牌。訂閱者使用令牌請求消息,發(fā)送者根據(jù)請求消息的數(shù)量推送消息。訂閱者可以隨時異步追加需要的更多信息。
JDK9中在Flow接口中實現(xiàn)了Java API的4個接口,并提供了SubmissionPublisher<T>作為Publisher<T>接口的簡單實現(xiàn)。