什么叫做Flink的有狀態計算呢?說白了就是將之前的中間結果暫時存儲起來,等待后續的事件數據過來后,可以使用之前的中間結果繼續計算。本文主要介紹Flink狀態計算和管理、代碼示例。
1、有狀態的計算
什么是Flink的有狀態的計算。在流式計算過程中將算子的中間結果保存在內存或者文件系統中,等下一個事件進入算子后可以從之前的狀態中獲取中間結果,以便計算當前的結果,從而無需每次都基于全部的原始數據來統計結果,極大地提升了系統性能。
每一個具有一定復雜度的流計算應用都是有狀態的,任何運行基本業務邏輯的流處理應用都需要在一定時間內存儲所接受的事件或者中間結果。
2、狀態管理
Flink如何管理狀態?主要就是:本地訪問和存儲、容錯性(可以自動按一定的時間間隔產生快照,并且在任務失敗后進行恢復)。
狀態(State)操作是指需要把當前數據和歷史計算結果進行累加計算,即當前數據的處理需要使用之前的數據或中間結果。
例如,對數據流中的實時單詞進行計數,每當接收到新的單詞時,需要將當前單詞數量累加到之前的結果中。這里單詞的數量就是狀態,對單詞數量的更新就是狀態的更新。如下圖:
狀態的計算模型,如下圖:
如下圖,Source、map()、keyBy()/window()/Apply()算子的并行度為2,Sink算子的并行度為1。keyBy()/window()/apply()算子是有狀態的,并且map()與keyBy()/window()/apply()算子之間通過網絡進行數據分發。
Flink應用程序的狀態訪問都在本地進行,這樣有助于提高吞吐量和降低延遲。通常情況下,Flink應用程序都是將狀態存儲在JVM堆內存中,但如果狀態數據太大,也可以選擇將其以結構化數據格式存儲在高速磁盤中。
通過狀態快照,Flink能夠提供可容錯的、精確一次的計算語義。Flink應用程序在執行時會獲取并存儲分布式Pipeline(流處理管道)中整體的狀態,它會將數據源中消費數據的偏移量記錄下來,并將整個作業圖中算子獲取到該數據(記錄的偏移量對應的數據)時的狀態記錄并存儲下來。
當發生故障時,Flink作業會恢復上次存儲的狀態,重置數據源從狀態中記錄的上次消費的偏移量,開始重新進行消費處理。而且狀態快照在執行時會異步獲取狀態并存儲,并不會阻塞正在進行的數據處理邏輯。這個機制跟Kafka等消息中間件的消費方式很像。
Flink需要知道狀態,以便可以使用Checkpoint和Savepoint來保證容錯(下一篇會繼續介紹)。狀態還允許重新調整Flink應用程序,這意味著Flink負責在并行實例之間重新分配狀態。
3、Keyed State
Keyed State是Flink提供的按照Key進行分區的狀態機制。
在通過keyBy()分組的KeyedStream上使用,對每個Key的數據進行狀態存儲和管理,狀態是跟每個Key綁定的,即每個Key對應一個狀態對象。
Keyed State支持的狀態數據類型如下:ValueState、ListState、ReducingState、AggregatingState<IN, OUT>、MapState<UK, UV>。下文以ValueState為例,介紹如何使用。
4、狀態管理示例和代碼
我們來模擬這樣一個場景:如果某個用戶1分鐘內連續兩次退款,第二次則發出告警。
模擬訂單對象:
public class OrderBO {
/**
* ID
*/
private Integer id;
/***
* 訂單標題
*/
private String title;
/**
* 訂單金額
*/
private Integer amount;
/**
* 訂單狀態:1-已支付,2-已退款
*/
private Integer state;
/**
* 用戶ID
*/
private String userId;
}
利用狀態管理,處理告警邏輯:
/**
* 告警處理邏輯
**/
private static class AlarmLogic extends KeyedProcessFunction<String, OrderBO, OrderBO> {
// 是否已經出現退款的標記
private ValueState<Boolean> flagState;
// 定時器,時間到了會清掉狀態
private ValueState<Long> timerState;
private static final long ONE_MINUTE = 60 * 1000;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
"timer-state",
Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
}
@Override
public void processElement(OrderBO value, KeyedProcessFunction<String, OrderBO, OrderBO>.Context context, Collector<OrderBO> collector) throws Exception {
Boolean refundFlag = flagState.value();
// 如果已經退款過一次了,如果再出現退款則發射給下個算子,然后清理掉定時器。狀態2代表退款
if (refundFlag != null && refundFlag) {
if (value.getState() == 2) {
collector.collect(value);
}
cleanUp(context);
} else {
// 如果第一次出現退款,則寫入狀態,同時開啟定時器。狀態2代表退款
if (value.getState() == 2) {
flagState.update(true);
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
}
}
/**
* 定時器到了之后,清理狀態值
*/
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, OrderBO, OrderBO>.OnTimerContext ctx, Collector<OrderBO> out) throws Exception {
timerState.clear();
flagState.clear();
}
/**
* 手動清理狀態值
*
* @param ctx
* @throws Exception
*/
private void cleanUp(Context ctx) throws Exception {
Long timer = timerState.value();
ctx.timerService().deleteProcessingTimeTimer(timer);
timerState.clear();
flagState.clear();
}
}
模式生成數據和主流程代碼:
public static void mAIn(String[] args) throws Exception {
// 1、執行環境創建
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、讀取Socket數據端口。實際根據具體業務對接數據來源
DataStreamSource<String> orderStream = environment.socketTextStream("localhost", 9527);
// 3、數據讀取個切割方式
SingleOutputStreamOperator<OrderBO> resultDataStream = orderStream
.flatMap(new CleanDataAnd2Order()) // 清洗和處理數據
.keyBy(x -> x.getUserId()) // 分區
.process(new AlarmLogic()); // 處理告警邏輯
// 4、打印分析結果
resultDataStream.print("告警===>");
// 5、環境啟動
environment.execute("OrderAlarmApp");
}
模擬數據:
模擬場景:某個用戶1分鐘內連續兩次退款,第二次發出告警。
示例數據:
1,aaa,100,1,user1
2,bbb,200,1,user2
3,ccc,300,2,user1
4,ddd,400,2,user1
5,ddd,400,2,user1
6,bbb,200,2,user2
7,bbb,400,2,user2
完整代碼地址:https://Github.com/yclxiao/flink-blog/blob/7eb84d18aa71d8f2023d6158796de34d331b9b3f/src/main/JAVA/top/mangod/flinkblog/demo005/OrderAlarmApp.java#L43