本文介紹了在Flink 1.13中配置RocksDB的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我讀過有關Flink 1.13版本中EmbeddedRocksDBStateBackend
的內容,但有大小限制,因此我希望保留以前Flink版本1.11的當前配置,但重點是這種配置RocksDB的方式已被棄用(new RocksDBStateBackend("path", true);
)。
我已使用EmbeddedRocksDBStateBackend (new EmbeddedRocksDBStateBackend(true))
嘗試使用新配置,但出現以下錯誤:
java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=9126648 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
從Java以編程方式為Flink 1.13配置RocksDB狀態后端的最佳方式是什么?
推薦答案
在Flink1.13中,我們重新組織了狀態后端,因為舊的方法導致了對事物如何工作的許多誤解。因此,這兩個問題是分離的:
-
您的工作狀態存儲在哪里(狀態后端)。(對于RocksDB,應將其配置為使用最快的可用本地磁盤。)
存儲檢查點的位置(檢查點存儲)。在大多數情況下,這應該是分布式文件系統。
在舊的API中,通過將檢查點路徑傳遞給RocksDBStateBackend
構造函數的方式,掩蓋了在RocksDB的情況下涉及兩個不同文件系統的事實。因此該配置位已移至其他位置(見下文)。
此表顯示舊狀態后端與新狀態后端(與檢查點存儲結合使用)之間的關系:
傳統狀態后端 | 新狀態后端+檢查點存儲 |
---|---|
MemoryStateBackend |
HashMapStateBackend + JobManagerCheckpointStorage |
FsStateBackend |
HashMapStateBackend + FileSystemCheckpointStorage |
RocksDBStateBackend |
EmbeddedRocksDBStateBackend + FileSystemCheckpointStorage |
在您的案例中,您希望將EmbeddedRocksDBStateBackend
與FileSystemCheckpointStorage
一起使用。您當前遇到的問題是,您正在對RocksDB使用內存中檢查點存儲(JobManagerCheckpointStorage
),這會嚴重限制可以設置檢查點的狀態數量。
您可以通過在flink-conf.yaml
中指定檢查點目錄來修復此問題
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
或在您的代碼中
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
有關完整詳細信息,請參閱Migrating from Legacy Backends上的文檔。
這篇關于在Flink 1.13中配置RocksDB的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,