處理函數是Flink底層的函數,工作中通常用來做一些更復雜的業務處理,這次把Flink的處理函數做一次總結,處理函數分好幾種,主要包括基本處理函數,keyed處理函數,window處理函數,通過源碼說明和案例代碼進行測試。
處理函數就是位于底層API里,熟悉處理函數能夠更好的處理Flink流處理。
Flink官方文檔:https://nightlies.Apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/process_function/
1. 基本處理函數(ProcessFunction)
首先我們看ProcessFunction的源碼,ProcessFunction是一個抽象類,繼承了AbstractRichFunction類,那么處理函數就擁有了富函數的所有特性。
1. 擁有的方法如下
processElement:編寫我們的處理邏輯,每個數據到來都會走這個函數,有三個參數,第一個參數是輸入值類型,第二個參數是上下文Context,第三個參數是收集器(輸出)。
onTimer:定時器,通過TimerService 進行注冊,當定時時間到達的時候就會執行onTimer函數。只有在KeyedStream中才可以使用。
2. 擁有的抽象類
Context:上下文抽象類,在這個類中可以獲取到當前時間戳,以及時間服務timerService,可用來注冊定時器和查詢時間。
3. 源碼
//I:輸入類型
//O:收集輸出類型
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public ProcessFunction() {
}
public abstract void processElement(I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3) throws Exception;
// 定時器觸發時執行的邏輯 當前不支持
public void onTimer(long timestamp, ProcessFunction<I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
}
public abstract class OnTimerContext extends ProcessFunction<I, O>.Context {
public OnTimerContext() {
super();
}
public abstract TimeDomain timeDomain();
}
public abstract class Context {
public Context() {
}
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}
3. 測試代碼
使用linux的nc服務進行端口監聽,并向9999端口發送數據,然后我們通過Flink監聽9999端口,并獲取數據進行數據處理。
安裝nc組件:
sudo yum install nc -y
開啟9999端口:
nc -lk 9999
代碼如下:
/**
* 處理函數測試
*/
public class ProcessFunctionTest {
public static void main(String[] args) throws Exception {
// todo 構建環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// todo 監聽hadoop110服務器9999端口,獲取輸入流
DataStreamSource<String> streamSource = env.socketTextStream("hadoop110", 9999);
// todo 封裝輸入流,將數據處理成{"userName":"aa","time",xxxxx}這個結構
SingleOutputStreamOperator<JSONObject> mapStream = streamSource.map(t -> {
JSONObject jsonObject = new JSONObject();
jsonObject.put("userName",t);
jsonObject.put("time",System.currentTimeMillis());
return jsonObject;
});
// TODO 調用處理函數
mapStream.process(new MyProcessFunction()).print("調用處理函數接收到的數據:");
env.execute();
}
}
// 自定義處理函數
class MyProcessFunction extends ProcessFunction<JSONObject,String>{
@Override
public void processElement(JSONObject jsonObject, Context context, Collector<String> collector) throws Exception {
System.out.println("processElement方法接收到的用戶數據:"+ jsonObject.getString("userName"));
collector.collect(jsonObject.getString("userName")+"-----");
}
}
2. 按鍵分區處理函數(KeyedProcessFunction)
按鍵分區處理函數是重點,用在keyby后面,對keyedStream進行處理,keyby將會按照Key進行分區,然后將不同key的數據分配到不同并行子任務上進行執行。
KeyedProcessFunction可以使用定時器和定時服務,代碼中使用定時器和定時服務查看數據和完成定時任務。
KeyedProcessFunction:處理分區數據,每個元素執行一次processElement方法
1. KeyedProcessFunction源碼
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public KeyedProcessFunction() {
}
// 處理方法,每個數據都會走這個方法
public abstract void processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3) throws Exception;
// 定時器邏輯,定時器觸發時會走這個方法的邏輯
public void onTimer(long timestamp, KeyedProcessFunction<K, I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
}
public abstract class OnTimerContext extends KeyedProcessFunction<K, I, O>.Context {
public OnTimerContext() {
super();
}
public abstract TimeDomain timeDomain();
public abstract K getCurrentKey();
}
public abstract class Context {
public Context() {
}
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> var1, X var2);
public abstract K getCurrentKey();
}
}
2. TimerService源碼
public interface TimerService {
// 不支持注冊定時器的提示語
String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
// 不支持刪除定時器的提示語
String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
// 當前處理時間
long currentProcessingTime();
// 當前水位線
long currentWatermark();
// 注冊基于處理時間的定時器
void registerProcessingTimeTimer(long var1);
// 注冊基于事件時間的定時器
void registerEventTimeTimer(long var1);
// 刪除基于處理時間的定時器
void deleteProcessingTimeTimer(long var1);
// 刪除基于事件時間的定時器
void deleteEventTimeTimer(long var1);
}
3. 測試代碼:
/**
* @title: KeyedProcessFunctionTest
* @Author Tian
* @Date: 2023/3/21 22:59
* @Version 1.0
*/
public class KeyedProcessFunctionTest {
public static void main(String[] args) throws Exception {
// todo 獲取環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// todo 處理原始數據并增加時間戳
SingleOutputStreamOperator<JSONObject> sourceStream = env.socketTextStream("hadoop110", 9999).map(t -> {
JSONObject jsonObject = new JSONObject();
jsonObject.put("userName", t);
jsonObject.put("time", System.currentTimeMillis()-5000);
return jsonObject;
});
// todo sourceStream設置水位線,指定事件時間字段
sourceStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObject, long l) {
return jsonObject.getLong("time");
}
}));
// todo 按照用戶名分組
// KeyedStream<JSONObject, String> keyedStream = sourceStream.keyBy(data -> true);
sourceStream.keyBy(data -> true).process(new MyKeyedProcessFunction());
// todo 調用窗口處理函數并輸出
// keyedStream.process(new MyKeyedProcessFunction()).print("處理結果:");
env.execute();
}
}
/**
* 自定義窗口處理函數
*/
class MyKeyedProcessFunction extends KeyedProcessFunction<Boolean,JSONObject,String>{
@Override
public void processElement(JSONObject jsonObject, Context context, Collector<String> collector) throws Exception {
System.out.println("當前處理Key:"+ context.getCurrentKey());
System.out.println("數據到達時間:"+ context.timestamp());
System.out.println("當前處理時間:"+ context.timerService().currentProcessingTime());
System.out.println("當前水位線:"+ context.timerService().currentWatermark());
// todo 注冊定時器,處理時間+1秒
context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()+1000);
// todo 返回當前key
collector.collect(jsonObject.toJSONString());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// todo 觸發器觸發的時候執行的邏輯 ;
super.onTimer(timestamp, ctx, out);
System.out.println("觸發器觸發了:"+ ctx.timestamp());
}
}
3. 窗口處理函數(ProcesswindowsFunction)
除了上面的按鍵分區處理函數之外,對于窗口也有函數,分兩種,一種是窗口處理函數(ProcessWindowsFunction),另一種是全窗口處理函數(ProcessAllWindowsFunction),ProcessWindowFunction獲得一個包含窗口所有元素的可迭代器以及一個具有時間和狀態信息訪問權的上下文對象,使得它比其他窗口函數提供更大的靈活性。是以性能和資源消耗為代價的,因為元素不能增量地聚合,而是需要在內部緩沖,直到認為窗口可以處理為止。
ProcessWindowsFunction:處理分區數據,每個窗口執行一次process方法
1. ProcessWindowsFunction源碼
// IN: input,數據流中窗口任務的輸入數據類型
// OUT: output,窗口任務進行計算之后的輸出數據類型。
// KEY:數據中鍵 key 的類型。
// W:窗口的類型,是 Window 的子類型。一般情況下我們定義時間窗口, W就是 TimeWindow。
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public ProcessWindowFunction() {
}
// Process處理數據方法
public abstract void process(KEY var1, ProcessWindowFunction<IN, OUT, KEY, W>.Context var2, Iterable<IN> var3, Collector<OUT> var4) throws Exception;
// 清除窗口數據方法
public void clear(ProcessWindowFunction<IN, OUT, KEY, W>.Context context) throws Exception {
}
// context上下文包含的內容
public abstract class Context implements Serializable {
public Context() {
}
public abstract W window();
public abstract long currentProcessingTime();
public abstract long currentWatermark();
public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}
2. 測試代碼
public class ProcessWindowsFunctionTest {
public static void main(String[] args) throws Exception {
// todo 獲取環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// todo 處理原始數據并增加時間戳
SingleOutputStreamOperator<JSONObject> sourceStream = env.socketTextStream("hadoop110", 9999).map(t -> {
Thread.sleep(5000);
JSONObject jsonObject = new JSONObject();
jsonObject.put("userName", t);
jsonObject.put("time", System.currentTimeMillis()-10000);
return jsonObject;
});
// todo sourceStream設置水位線,指定事件時間字段
sourceStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObject, long l) {
return jsonObject.getLong("time");
}
}));
// todo 按照用戶名分組
// KeyedStream<JSONObject, String> keyedStream = sourceStream.keyBy(data -> true);
sourceStream.keyBy(data -> data.getString("userName"))
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new MyProcessWindowsFunction()).print();
env.execute();
}
}
class MyProcessWindowsFunction extends ProcessWindowFunction<JSONObject, HashMap<String,Long>, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<JSONObject> iterable, Collector<HashMap<String,Long>> collector) throws Exception {
// 創建map對象
HashMap<String, Long> result = new HashMap<>();
// 遍歷窗口中的數據
for (JSONObject jsonObject : iterable) {
String userName = jsonObject.getString("userName");
if(result.containsKey(userName)){
Long aLong = result.get(userName);
result.put(userName,aLong+1);
}else {
result.put(userName,1l);
}
}
collector.collect(result);
}
}
4. 全窗口處理函數(ProcessAllWindowFunction)
ProcessAllWindowFunction和ProcessFunction類相似,都是用來對上游過來的元素做處理,不過ProcessFunction是每個元素執行一次processElement方法,ProcessAllWindowFunction是每個窗口執行一次process方法(方法內可以遍歷該窗口內的所有元素);
1. ProcessAllWindowFunction源碼
//IN: input,數據流中窗口任務的輸入數據類型。
//OUT: output,窗口任務進行計算之后的輸出數據類型。
//W:窗口的類型,是 Window 的子類型。一般情況下我們定義時間窗口, W就是 TimeWindow。
public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public ProcessAllWindowFunction() {
}
// 處理方法邏輯
public abstract void process(ProcessAllWindowFunction<IN, OUT, W>.Context var1, Iterable<IN> var2, Collector<OUT> var3) throws Exception;
// 清除窗口內數據
public void clear(ProcessAllWindowFunction<IN, OUT, W>.Context context) throws Exception {
}
// 窗口上下文信息
public abstract class Context {
public Context() {
}
public abstract W window();
public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}
2. 測試代碼
public class ProcessAllWindowFunctionTest {
public static void main(String[] args) throws Exception {
// todo 獲取環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// todo 處理原始數據并增加時間戳
SingleOutputStreamOperator<JSONObject> sourceStream = env.socketTextStream("hadoop110", 9999).map(t -> {
JSONObject jsonObject = new JSONObject();
jsonObject.put("userName", t);
jsonObject.put("time", System.currentTimeMillis()-10000);
return jsonObject;
});
// todo sourceStream設置水位線,指定事件時間字段
sourceStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObject, long l) {
return jsonObject.getLong("time");
}
}));
// todo 按照用戶名分組
// KeyedStream<JSONObject, String> keyedStream = sourceStream.keyBy(data -> true);
sourceStream.keyBy(data -> data.getString("userName"))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new MyProcessAllWindowFunction()).print();
env.execute();
}
}
class MyProcessAllWindowFunction extends ProcessAllWindowFunction<JSONObject, HashMap<String,Long>, TimeWindow>{
@Override
public void process(Context context, Iterable<JSONObject> iterable, Collector<HashMap<String, Long>> collector) throws Exception {
// 創建map對象
HashMap<String, Long> result = new HashMap<>();
// 遍歷窗口中的數據
for (JSONObject jsonObject : iterable) {
String userName = jsonObject.getString("userName");
if(result.containsKey(userName)){
Long aLong = result.get(userName);
result.put(userName,aLong+1);
}else {
result.put(userName,1l);
}
}
collector.collect(result);
}
}
5. 合并流處理函數(CoProcessFunction)
對于連接流ConnectedStreams 的處理操作,需要分別定義對兩條流的處理轉換,因此接口中就會有兩個相同的方法需要實現,用數字“1”“2”區分,在兩條流中的數據到來時分別調用。我們把這種接口叫作“協同處理函數”(co-process function)。與 CoMapFunction 類似,如果是調用.flatMap()就需要傳入一個 CoFlatMapFunction,需要實現 flatMap1()、flatMap2()兩個方法;而調用.process()時,傳入的則是一個 CoProcessFunction。
1. 源碼
public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public CoProcessFunction() {
}
// 第一條流處理方法
public abstract void processElement1(IN1 var1, CoProcessFunction<IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;
// 第二條流處理方法
public abstract void processElement2(IN2 var1, CoProcessFunction<IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;
public void onTimer(long timestamp, CoProcessFunction<IN1, IN2, OUT>.OnTimerContext ctx, Collector<OUT> out) throws Exception {
}
public abstract class OnTimerContext extends CoProcessFunction<IN1, IN2, OUT>.Context {
public OnTimerContext() {
super();
}
public abstract TimeDomain timeDomain();
}
public abstract class Context {
public Context() {
}
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}
6. 連接流處理函數(ProcessJoinFunction)
ProcessJoinFunction和CoProcessFunction類似,但是有區別。
ProcessJoinFunction 用于join流操作,可以拿到兩個流數據處理
CoProcessFunction 用于連接流處理,兩個流數據分別處理
1. 源碼
//IN1:第一條流輸入類型
//IN2:第二條流處理類型
//OUT:輸出類型
public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {
private static final long serialVersionUID = -2444626938039012398L;
public ProcessJoinFunction() {
}
// 流join后處理邏輯,可以獲取到兩個流的數據
public abstract void processElement(IN1 var1, IN2 var2, ProcessJoinFunction<IN1, IN2, OUT>.Context var3, Collector<OUT> var4) throws Exception;
public abstract class Context {
public Context() {
}
public abstract long getLeftTimestamp();
public abstract long getRightTimestamp();
public abstract long getTimestamp();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}
7. 廣播流處理函數(BroadcastProcessFunction)
廣播連接流處理函數,基于 BroadcastConnectedStream 調用.process()時作為參數傳入。這里的“廣播連接流” BroadcastConnectedStream,是一個未 keyBy 的普通 DataStream 與一個廣播流(BroadcastStream)做連接(conncet)之后的產物。
1. 源碼
// IN1:輸入的第一條流
// IN2:輸入的第二條流
// OUT:輸出類型
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
private static final long serialVersionUID = 8352559162119034453L;
public BroadcastProcessFunction() {
}
// 處理流邏輯
// IN1:輸入流數據
// ReadOnlyContext:只讀上下文
// Collector<OUT>:輸出
public abstract void processElement(IN1 var1, BroadcastProcessFunction<IN1, IN2, OUT>.ReadOnlyContext var2, Collector<OUT> var3) throws Exception;
// 處理廣播流邏輯
public abstract void processBroadcastElement(IN2 var1, BroadcastProcessFunction<IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;
// 只讀的上下文
public abstract class ReadOnlyContext extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.ReadOnlyContext {
public ReadOnlyContext() {
super(BroadcastProcessFunction.this);
}
}
public abstract class Context extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.Context {
public Context() {
super(BroadcastProcessFunction.this);
}
}
}
8. 按鍵分區的廣播連接流處理函數(KeyedBroadcastProcessFunction)
按鍵分區的廣播連接流處理函數,同樣是基于 BroadcastConnectedStream 調用.process()時作為參數傳入。與 BroadcastProcessFunction 不同的是,這時的廣播連接流, 是一個 KeyedStream與廣播流(BroadcastStream)做連接之后的產物。
1.源碼
// KS:當調用keyBy時所依賴的Key 的類型
// IN1:第一條流類型
// IN2:廣播流類型
// OUT:輸出類型
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
private static final long serialVersionUID = -2584726797564976453L;
public KeyedBroadcastProcessFunction() {
}
// 第一條流處理邏輯
public abstract void processElement(IN1 var1, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.ReadOnlyContext var2, Collector<OUT> var3) throws Exception;
// 廣播流處理邏輯
public abstract void processBroadcastElement(IN2 var1, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;
// 定時器出發后執行的邏輯
public void onTimer(long timestamp, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.OnTimerContext ctx, Collector<OUT> out) throws Exception {
}
public abstract class OnTimerContext extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.ReadOnlyContext {
public OnTimerContext() {
super();
}
public abstract TimeDomain timeDomain();
public abstract KS getCurrentKey();
}
public abstract class ReadOnlyContext extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.ReadOnlyContext {
public ReadOnlyContext() {
super(KeyedBroadcastProcessFunction.this);
}
public abstract TimerService timerService();
public abstract KS getCurrentKey();
}
public abstract class Context extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.Context {
public Context() {
super(KeyedBroadcastProcessFunction.this);
}
public abstract <VS, S extends State> void ApplyToKeyedState(StateDescriptor<S, VS> var1, KeyedStateFunction<KS, S> var2) throws Exception;
}
}