本文介紹了如何使用生成返回Mono的包裝調(diào)用來(lái)創(chuàng)建Flux的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!
問(wèn)題描述
我有一個(gè)例子,我想使用Flos.Generate,因?yàn)槲也幌脒M(jìn)行昂貴的阻塞調(diào)用,除非/直到訂閱者請(qǐng)求它。具體地說(shuō),我會(huì)多次調(diào)用Elasticearch(有效地進(jìn)行分頁(yè)),直到?jīng)]有更多的匹配。我已經(jīng)使用Iterator<SearchResponse>
中的標(biāo)準(zhǔn)阻塞調(diào)用實(shí)現(xiàn)了這一點(diǎn)。其中,對(duì)生成的lambda塊和通量的每次調(diào)用都以.subscribeOn(Schedulers.boundedElastic())
結(jié)束。但是,我想使用Spring的ReactiveElasticsearchClient
,它返回一個(gè)Mono<SearchResponse>
,但仍然希望一次返回一個(gè)。
以下是使用阻止的前一段代碼:
public Iterator<SearchResponse> createDeepQueryIterator(@NonNull PITSearchInput input){
return new PointInTimeIterator(elasticClient, input);
}
public Flux<SearchResponse> createDeepQueryFlux(@NonNull PITSearchInput input){
return Flux.<SearchResponse, PointInTimeIterator>generate(
() -> new PointInTimeIterator(elasticClient, input),
(deepQueryIterator, sink) -> {
if (deepQueryIterator.hasNext()) {
sink.next(deepQueryIterator.next());
}else{
sink.complete();
}
return deepQueryIterator;
},
(deepQueryIterator) -> deepQueryIterator.shutdown())
.subscribeOn(Schedulers.boundedElastic());
}
上面的工作很好,因?yàn)樗鼤?huì)等待對(duì)ES的下一次調(diào)用,直到訂戶準(zhǔn)備好接收下一塊數(shù)據(jù)。
在下面,我嘗試使用Spring的ReactiveElasticsearchClient
,但問(wèn)題是在訂閱者處理第一個(gè)調(diào)用之前,對(duì)ES進(jìn)行了多次調(diào)用。
public Flux<SearchResponse> createDeepQuery(PointInTimeIteratorFactory.PITSearchInput input) {
log.info("Creating flux");
AtomicReference<PitId> pitId = new AtomicReference<>();
AtomicInteger count = new AtomicInteger();
Mono<PitId> pitIdMono =
Mono.fromCallable(
() -> {
pitId.set(createPIT(input));
return pitId.get();
})
.subscribeOn(Schedulers.boundedElastic());
Mono<SearchResponse> searchResponseMono =
pitIdMono.flatMap(
p -> {
log.info("Calling search");
return reactiveElasticsearchClient.searchForResponse(createSearchRequestFrom(p, input));
});
Flux<SearchResponse> expand =
searchResponseMono
.expand(
(searchResponse -> {
int hitCount = searchResponse.getHits().getHits().length;
count.addAndGet(hitCount);
log.info("Previous returned {} hits totaling {}", hitCount, count.get());
if (count.get() > input.getMaxTotalSize()
|| hitCount < input.getMaxSizePerQuery()){
log.info("Returning empty");
return Mono.empty();
}
log.info("Calling search");
pitId.set(new PitId(searchResponse.pointInTimeId()));
return reactiveElasticsearchClient.searchForResponse(
createSearchRequestFrom(searchResponse, input));
}))
.doFinally(
p -> {
deletePIT(pitId.get());
});
return expand;
}
因此,問(wèn)題不是使用被動(dòng)客戶端在Flux
中返回Mono<SearchResponse>
的能力,而是根據(jù)訂閱者的需要一次只返回一個(gè)。
下面是上面的Flux->Mono方法的日志記錄,PitTest
日志記錄來(lái)自通量的測(cè)試onNext()。
2021-12-02 13:13:37.300 INFO 13704 --- [ main] a.a.t.ReactivePointInTimeIteratorFactory : Creating flux
2021-12-02 13:13:37.346 INFO 13704 --- [oundedElastic-1] a.a.t.ReactivePointInTimeIteratorFactory : Creating PIT
2021-12-02 13:13:37.407 INFO 13704 --- [oundedElastic-1] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.176 INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 50
2021-12-02 13:13:38.177 INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.177 INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877306267
2021-12-02 13:13:38.228 INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 100
2021-12-02 13:13:38.228 INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.228 INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877606162
2021-12-02 13:13:38.271 INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 150
2021-12-02 13:13:38.271 INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.272 INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877606362
2021-12-02 13:13:38.311 INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 200
2021-12-02 13:13:38.312 INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.312 INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877906244
2021-12-02 13:13:38.344 INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 250
2021-12-02 13:13:38.345 INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Returning empty
2021-12-02 13:13:38.345 INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Closing PIT ReactivePointInTimeIteratorFactory.PitId(id=m_2xAwENYWN0aXZpdHlzdG9yZRZQQkRGWldmclI2cWZITEpoWDI1cGlRABZCZU8xbm55ZlFabXREYmNEdThESG1RAAAAAAAAWQcTFm5BcXdPU2xTUWE2bEU4dkVPVkpkWFEBFlBCREZaV2ZyUjZxZkhMSmhYMjVwaVEAAA==)
2021-12-02 13:13:40.171 INFO 13704 --- [ parallel-1] p.actss.activity.store.PitTest : [1634877306066]
2021-12-02 13:13:42.172 INFO 13704 --- [ parallel-2] p.actss.activity.store.PitTest : [1634877306272]
2021-12-02 13:13:44.172 INFO 13704 --- [ parallel-3] p.actss.activity.store.PitTest : [1634877606166]
2021-12-02 13:13:46.173 INFO 13704 --- [ parallel-4] p.actss.activity.store.PitTest : [1634877906057]
2021-12-02 13:13:48.174 INFO 13704 --- [ parallel-1] p.actss.activity.store.PitTest : [1634877906248]
2021-12-02 13:13:48.174 INFO 13704 --- [ parallel-1] p.actss.activity.store.PitTest : Complete
2021-12-02 13:13:48.174 INFO 13704 --- [ main] p.actss.activity.store.PitTest : blah
2021-12-02 13:13:48.175 INFO 13704 --- [ parallel-1] p.actss.activity.store.PitTest : onComplete
更新:添加PitTest代碼以確保完整性:
@Test
void testReactoiveFluxIt() throws InterruptedException {
Flux<SearchResponse> deepQuery = reactivePointInTimeIteratorFactory.createDeepQuery(...);
deepQuery
.delayElements(Duration.ofMillis(2000))
.doOnNext(p -> log.info(Arrays.toString(p.getHits().getHits()[0].getSortValues()))) //
.doOnComplete(() -> log.info("Complete")) //
.doFinally(p -> log.info(p.toString()))
.blockLast();
log.info("blah");
Thread.sleep(5000);
}
推薦答案
delayElements
切換到并行調(diào)度程序并將每個(gè)發(fā)出的元素延遲2秒。這就是以后打印排序值的原因。
這篇關(guān)于如何使用生成返回Mono的包裝調(diào)用來(lái)創(chuàng)建Flux的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,