波多野结衣 蜜桃视频,国产在线精品露脸ponn,a v麻豆成人,AV在线免费小电影

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

本文介紹了如何使用生成返回Mono的包裝調(diào)用來(lái)創(chuàng)建Flux的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!

問(wèn)題描述

我有一個(gè)例子,我想使用Flos.Generate,因?yàn)槲也幌脒M(jìn)行昂貴的阻塞調(diào)用,除非/直到訂閱者請(qǐng)求它。具體地說(shuō),我會(huì)多次調(diào)用Elasticearch(有效地進(jìn)行分頁(yè)),直到?jīng)]有更多的匹配。我已經(jīng)使用Iterator<SearchResponse>中的標(biāo)準(zhǔn)阻塞調(diào)用實(shí)現(xiàn)了這一點(diǎn)。其中,對(duì)生成的lambda塊和通量的每次調(diào)用都以.subscribeOn(Schedulers.boundedElastic())結(jié)束。但是,我想使用Spring的ReactiveElasticsearchClient,它返回一個(gè)Mono<SearchResponse>,但仍然希望一次返回一個(gè)。

以下是使用阻止的前一段代碼:


  public Iterator<SearchResponse> createDeepQueryIterator(@NonNull PITSearchInput input){
    return new PointInTimeIterator(elasticClient, input);
  }

  public Flux<SearchResponse> createDeepQueryFlux(@NonNull PITSearchInput input){
    return Flux.<SearchResponse, PointInTimeIterator>generate(
            () -> new PointInTimeIterator(elasticClient, input),
            (deepQueryIterator, sink) -> {
              if (deepQueryIterator.hasNext()) {
                sink.next(deepQueryIterator.next());
              }else{
                sink.complete();
              }
              return deepQueryIterator;
            },
            (deepQueryIterator) -> deepQueryIterator.shutdown())
        .subscribeOn(Schedulers.boundedElastic());
  }

上面的工作很好,因?yàn)樗鼤?huì)等待對(duì)ES的下一次調(diào)用,直到訂戶準(zhǔn)備好接收下一塊數(shù)據(jù)。

在下面,我嘗試使用Spring的ReactiveElasticsearchClient,但問(wèn)題是在訂閱者處理第一個(gè)調(diào)用之前,對(duì)ES進(jìn)行了多次調(diào)用。


  public Flux<SearchResponse> createDeepQuery(PointInTimeIteratorFactory.PITSearchInput input) {
    log.info("Creating flux");

    AtomicReference<PitId> pitId = new AtomicReference<>();
    AtomicInteger count = new AtomicInteger();

    Mono<PitId> pitIdMono =
        Mono.fromCallable(
            () -> {
              pitId.set(createPIT(input));
              return pitId.get();
            })
        .subscribeOn(Schedulers.boundedElastic());
    Mono<SearchResponse> searchResponseMono =
        pitIdMono.flatMap(
            p -> {
              log.info("Calling search");
              return reactiveElasticsearchClient.searchForResponse(createSearchRequestFrom(p, input));
            });
    Flux<SearchResponse> expand =
        searchResponseMono
            .expand(
                (searchResponse -> {
                  int hitCount = searchResponse.getHits().getHits().length;
                  count.addAndGet(hitCount);
                  log.info("Previous returned {} hits totaling {}", hitCount, count.get());

                  if (count.get() > input.getMaxTotalSize()
                  || hitCount < input.getMaxSizePerQuery()){
                    log.info("Returning empty");
                    return Mono.empty();
                  }

                  log.info("Calling search");
                  pitId.set(new PitId(searchResponse.pointInTimeId()));
                  return reactiveElasticsearchClient.searchForResponse(
                      createSearchRequestFrom(searchResponse, input));
                }))
            .doFinally(
                p -> {
                  deletePIT(pitId.get());
                });
    return expand;
  }

因此,問(wèn)題不是使用被動(dòng)客戶端在Flux中返回Mono<SearchResponse>的能力,而是根據(jù)訂閱者的需要一次只返回一個(gè)。

下面是上面的Flux->Mono方法的日志記錄,PitTest日志記錄來(lái)自通量的測(cè)試onNext()。

2021-12-02 13:13:37.300  INFO 13704 --- [           main] a.a.t.ReactivePointInTimeIteratorFactory : Creating flux
2021-12-02 13:13:37.346  INFO 13704 --- [oundedElastic-1] a.a.t.ReactivePointInTimeIteratorFactory : Creating PIT
2021-12-02 13:13:37.407  INFO 13704 --- [oundedElastic-1] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.176  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 50
2021-12-02 13:13:38.177  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.177  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877306267
2021-12-02 13:13:38.228  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 100
2021-12-02 13:13:38.228  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.228  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877606162
2021-12-02 13:13:38.271  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 150
2021-12-02 13:13:38.271  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.272  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877606362
2021-12-02 13:13:38.311  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 200
2021-12-02 13:13:38.312  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.312  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877906244
2021-12-02 13:13:38.344  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 250
2021-12-02 13:13:38.345  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Returning empty
2021-12-02 13:13:38.345  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Closing PIT ReactivePointInTimeIteratorFactory.PitId(id=m_2xAwENYWN0aXZpdHlzdG9yZRZQQkRGWldmclI2cWZITEpoWDI1cGlRABZCZU8xbm55ZlFabXREYmNEdThESG1RAAAAAAAAWQcTFm5BcXdPU2xTUWE2bEU4dkVPVkpkWFEBFlBCREZaV2ZyUjZxZkhMSmhYMjVwaVEAAA==)
2021-12-02 13:13:40.171  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : [1634877306066]
2021-12-02 13:13:42.172  INFO 13704 --- [     parallel-2] p.actss.activity.store.PitTest           : [1634877306272]
2021-12-02 13:13:44.172  INFO 13704 --- [     parallel-3] p.actss.activity.store.PitTest           : [1634877606166]
2021-12-02 13:13:46.173  INFO 13704 --- [     parallel-4] p.actss.activity.store.PitTest           : [1634877906057]
2021-12-02 13:13:48.174  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : [1634877906248]
2021-12-02 13:13:48.174  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : Complete
2021-12-02 13:13:48.174  INFO 13704 --- [           main] p.actss.activity.store.PitTest           : blah
2021-12-02 13:13:48.175  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : onComplete

更新:添加PitTest代碼以確保完整性:


  @Test
  void testReactoiveFluxIt() throws InterruptedException {
    Flux<SearchResponse> deepQuery = reactivePointInTimeIteratorFactory.createDeepQuery(...);

    deepQuery
        .delayElements(Duration.ofMillis(2000))
        .doOnNext(p -> log.info(Arrays.toString(p.getHits().getHits()[0].getSortValues()))) //
        .doOnComplete(() -> log.info("Complete")) //
        .doFinally(p -> log.info(p.toString()))
        .blockLast();
    log.info("blah");
    Thread.sleep(5000);
  }

推薦答案

delayElements切換到并行調(diào)度程序并將每個(gè)發(fā)出的元素延遲2秒。這就是以后打印排序值的原因。

這篇關(guān)于如何使用生成返回Mono的包裝調(diào)用來(lái)創(chuàng)建Flux的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,

分享到:
標(biāo)簽:Flux Mono 創(chuàng)建 如何使用 生成 調(diào)用 返回
用戶無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定