日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

目錄
  • Flink 側(cè)流輸出源碼解析
  • 源碼解析
    • TimestampedCollector#collect
    • CountingOutput#collect
    • BroadcastingOutputCollector#collect
    • RecordWriterOutput#collect
    • ProcessOperator#ContextImpl#output
    • CountingOutput#collect
    • BroadcastingOutputCollector#collect
    • RecordWriterOutput#collect
    • OutputTag#isResponsibleFor
    • getSideOutput 源碼
  • 總結(jié)

    Flink 側(cè)流輸出源碼解析

    Flink 的 side output 為我們提供了側(cè)流(分流)輸出的功能,根據(jù)條件可以把一條流分為多個(gè)不同的流,之后做不同的處理邏輯,下面就來看下側(cè)流輸出相關(guān)的源碼。

    先來看下面的一個(gè) Demo,一個(gè)流被分成了 3 個(gè)流,一個(gè)主流,兩個(gè)側(cè)流輸出。

    SingleOutputStreamOperator<JasonLeePOJO> process =
            kafka_source1.process(
                    new ProcessFunction<JasonLeePOJO, JasonLeePOJO>() {
                        @Override
                        public void processElement(
                                JasonLeePOJO value,
                                ProcessFunction<JasonLeePOJO, JasonLeePOJO>.Context ctx,
                                Collector<JasonLeePOJO> out)
                                throws Exception {
                            // 這個(gè)是主流輸出
                            if (value.getName().equals("flink")) {
                                out.collect(value);
                            // 下面兩個(gè)是測流輸出
                            } else if (value.getName().equals("spark")) {
                                ctx.output(test, value);
                            // 測流
                            } else if (value.getName().equals("hadoop")) {
                                ctx.output(test1, value);
                            }
                        }
                    });
    

    為了更加清楚的查看每一個(gè)算子,我禁用了 operator chain,任務(wù)的 DAG 圖如下所示:

    Flink?側(cè)流輸出源碼示例解析

    這樣就比較清晰了,很明顯從 process 算子開始,1 個(gè)數(shù)據(jù)流分為了 3 個(gè)數(shù)據(jù)流,當(dāng)然,在默認(rèn)情況下沒有禁止

    operator chain 所有的算子都是 chain 在一起的。

    源碼解析

    我們先來看第一個(gè)主流輸出也就是 out.collect(value) 的源碼,這里的 out 實(shí)際上是 TimestampedCollector 對(duì)象。

    TimestampedCollector#collect

    @Override
    public void collect(T record) {
        output.collect(reuse.replace(record));
    }
    

    在 collect 方法中持有一個(gè) output 對(duì)象,用來輸出數(shù)據(jù),在這里實(shí)際上是一個(gè) CountingOutput 它是一個(gè)包裝了 Output 的對(duì)象,主要用于更新發(fā)送數(shù)據(jù)的 metric,并輸出數(shù)據(jù)。

    CountingOutput#collect

    @Override
    public void collect(StreamRecord<OUT> record) {
        numRecordsOut.inc();
        output.collect(record);
    }
    

    在 CountingOutput 中也持有一個(gè) output 對(duì)象,但是這里的 output 是 BroadcastingOutputCollector 對(duì)象,從名字就可以看出它是往下游廣播數(shù)據(jù)的,這里就有一個(gè)疑問?把數(shù)據(jù)廣播到下游,那豈不是下游的每個(gè)數(shù)據(jù)流都有這條數(shù)據(jù)嗎?這樣的話是怎么實(shí)現(xiàn)分流的呢?帶著這個(gè)疑問,我們來看 BroadcastingOutputCollector 的 collect 方法是怎么實(shí)現(xiàn)的。

    BroadcastingOutputCollector#collect

    @Override
    public void collect(StreamRecord<T> record) {
        // 這里的 outputs 數(shù)組有三個(gè) output 分別對(duì)應(yīng)上面的三個(gè)輸出流
        for (Output<StreamRecord<T>> output : outputs) {
            output.collect(record);
        }
    }
    

    在 BroadcastingOutputCollector 對(duì)象里也持有一個(gè) output 對(duì)象,其實(shí)他們都實(shí)現(xiàn)了 Output 接口,用來往下游發(fā)送數(shù)據(jù),這里的 outputs 是一個(gè) Output 數(shù)組,代表了下游的所有 Output,因?yàn)樯厦嬗腥齻€(gè)輸出流,所以數(shù)組里面就包含了 3 個(gè) Output 對(duì)象。

    循環(huán)的調(diào)用 output 的 collect 方法往下游發(fā)送數(shù)據(jù),因?yàn)槲掖驍嗔?operator chain,所以 process 算子和下游的 Print 算子不在同一個(gè) operatorChain 內(nèi),那么上下游算子之間數(shù)據(jù)傳輸用的就是 RecordWriterOutput,否則用的是 CopyingChainingOutput 或者 ChainingOutput,具體使用的是哪個(gè) Output 這里就不多介紹了,后面有時(shí)間的話會(huì)單獨(dú)介紹。

    RecordWriterOutput#collect

    @Override
    public void collect(StreamRecord<OUT> record) {
        // 主流是沒有 outputTag 的,只有測流有 outputTag
        if (this.outputTag != null) {
            // we are not responsible for emitting to the main output.
            return;
        }
    
        pushToRecordWriter(record);
    }
    

    接著來看 RecordWriterOutput 的 collect 方法,在 collect 方法里面會(huì)先判斷 outputTag 是否為空,如果不為空不做任何處理,直接返回,否則就把數(shù)據(jù)推送到下游算子,只有側(cè)流輸出才需要定義 outputTag,主流(正常流)是沒有 outputTag 的,所以這里會(huì)走 pushToRecordWriter 方法把數(shù)據(jù)寫入到下游,也就是說雖然會(huì)以廣播的形式把數(shù)據(jù)廣播到所有下游,但其實(shí)另外兩個(gè)側(cè)流是直接返回的,只有主流才會(huì)把數(shù)據(jù)推送到下游,這也就解釋了上面的疑問。

    然后再來看第二個(gè)側(cè)流輸出 ctx.output(test, value) 的源碼,這里的 ctx 實(shí)際上是 ProcessOperator#ContextImpl 對(duì)象。

    ProcessOperator#ContextImpl#output

    @Override
    public <X> void output(OutputTag<X> outputTag, X value) {
        if (outputTag == null) {
            throw new IllegalArgumentException("OutputTag must not be null.");
        }
        output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
    }
    

    如果 outputTag 是空,直接拋出異常,因?yàn)檫@個(gè)是側(cè)流,所以必須要定義 OutputTag。這里的 output 實(shí)際上是父類 AbstractStreamOperator 所持有的變量,如果 outputTag 不為空,就調(diào)用 output 的 collect 方法把數(shù)據(jù)發(fā)送到下游,這里的 output 和上面的一樣是 CountingOutput 但是 collect 方法是另外一個(gè)重載的方法。

    CountingOutput#collect

    @Override
    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
        numRecordsOut.inc();
        output.collect(outputTag, record);
    }
    

    可以發(fā)現(xiàn),這個(gè) collect 方法比上面那個(gè)多了一個(gè) OutputTag 參數(shù),也就是使用側(cè)流輸出的時(shí)候定義的 OutputTag 對(duì)象,然后調(diào)用 output 的 collect 方法發(fā)送數(shù)據(jù),這個(gè)也和上面的一樣,同樣是 BroadcastingOutputCollector 對(duì)象的另外一個(gè)重載方法,多了一個(gè) OutputTag 參數(shù)。

    BroadcastingOutputCollector#collect

    @Override
    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
        for (Output<StreamRecord<T>> output : outputs) {
            output.collect(outputTag, record);
        }
    }
    

    這里的邏輯和上面是一樣的,同樣的循環(huán)調(diào)用 collect 方法發(fā)送數(shù)據(jù)。

    RecordWriterOutput#collect

    @Override
    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
        // 先要判斷兩個(gè) OutputTag 是否一樣
        if (OutputTag.isResponsibleFor(this.outputTag, outputTag)) {
            pushToRecordWriter(record);
        }
    }
    

    在這個(gè) collect 方法中會(huì)先判斷傳入的 OutputTag 對(duì)象和成員變量 this.outputTag 是不是相等,如果是的話,就發(fā)送數(shù)據(jù),否則不做任何處理,所以這里每次只會(huì)選擇一個(gè)下游側(cè)流輸出數(shù)據(jù),這樣就實(shí)現(xiàn)了所謂的分流。

    OutputTag#isResponsibleFor

    public static boolean isResponsibleFor(
            @Nullable OutputTag<?> owner, @Nonnull OutputTag<?> other) {
        return other.equals(owner);
    }
    

    可以看到在 isResponsibleFor 方法內(nèi)是直接調(diào)用 OutputTag 的 equals 方法判斷兩個(gè)對(duì)象是否相等的。

    第三個(gè)側(cè)流 test1 ctx.output(test1, value) 和第二個(gè)側(cè)流 test 是完全一樣的情況,這里就不在看代碼了。

    上面是完成了分流操作,那怎么獲取到分流后結(jié)果呢(數(shù)據(jù)流)?我們可以通過 getSideOutput 方法獲取。

    DataStream<JasonLeePOJO> sideOutput = process.getSideOutput(test);
    DataStream<JasonLeePOJO> sideOutput1 = process.getSideOutput(test1);
    

    getSideOutput 源碼

    public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
        sideOutputTag = clean(requireNonNull(sideOutputTag));
    
        // make a defensive copy
        sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());
    
        TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
        if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
            throw new UnsupportedOperationException(
                    "A side output with a matching id was "
                            + "already requested with a different type. This is not allowed, side output "
                            + "ids need to be unique.");
        }
    
        requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());
    
        SideOutputTransformation<X> sideOutputTransformation =
                new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
        return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
    }
    

    getSideOutput 方法里先是構(gòu)建了一個(gè) SideOutputTransformation 對(duì)象,然后又構(gòu)建了 DataStream 對(duì)象,這樣我們就可以基于分流后的 DataStream 做不同的處理邏輯了,從而實(shí)現(xiàn)了把一個(gè) DataStream 分流成多個(gè) DataStream 功能。

    總結(jié)

    通過對(duì)側(cè)流輸出的源碼進(jìn)行解析,在分流的時(shí)候,數(shù)據(jù)是通過廣播的方式發(fā)送到下游算子的,對(duì)于主流的數(shù)據(jù)來說,只有 OutputTag 為空的才會(huì)處理,側(cè)流因?yàn)?OutputTag 不為空,所以直接返回,不做任何處理,那對(duì)于側(cè)流的數(shù)據(jù)來說,是通過判斷兩個(gè) OutputTag 是否相等,所以每次只會(huì)把數(shù)據(jù)發(fā)送到下游對(duì)應(yīng)的那一個(gè)側(cè)流上去,這樣即可實(shí)現(xiàn)分流邏輯。

    以上就是Flink 側(cè)流輸出源碼示例解析的詳細(xì)內(nèi)容,更多關(guān)于Flink 側(cè)流輸出的資料請(qǐng)關(guān)注其它相關(guān)文章!

    分享到:
    標(biāo)簽:服務(wù)器 源碼 示例 解析 輸出
    用戶無頭像

    網(wǎng)友整理

    注冊(cè)時(shí)間:

    網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

    • 51998

      網(wǎng)站

    • 12

      小程序

    • 1030137

      文章

    • 747

      會(huì)員

    趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
    最新入駐小程序

    數(shù)獨(dú)大挑戰(zhàn)2018-06-03

    數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

    答題星2018-06-03

    您可以通過答題星輕松地創(chuàng)建試卷

    全階人生考試2018-06-03

    各種考試題,題庫,初中,高中,大學(xué)四六

    運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

    記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

    每日養(yǎng)生app2018-06-03

    每日養(yǎng)生,天天健康

    體育訓(xùn)練成績?cè)u(píng)定2018-06-03

    通用課目體育訓(xùn)練成績?cè)u(píng)定