本文介紹了Hazelcast噴氣變化數據捕獲的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我正在我的應用程序中使用Hazelcast變更數據捕獲(CDC)。
(我之所以使用CDC,是因為如果使用JDBC或其他替代功能將數據加載到緩存中,則需要花費大量時間)
因此CDC將在數據庫和Hazelcast Jet之間進行數據同步。
StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
.setCustomProperty("plugin.name", "pgoutput").setDatabaseAddress("127.0.0.1").setDatabasePort(5432)
.setDatabaseUser("postgres").setDatabasePassword("root").setDatabaseName("postgres")
.setTableWhitelist("tblName").build();
這里我有以下步驟:-
Pipeline pipeline = Pipeline.create();
// filter records based on deleted false
StreamStage<ChangeRecord> deletedFlagRecords = pipeline.readFrom(source).withoutTimestamps()
.filter(deletedFalse);
deletedFlagRecords.filter(idBasedFetch).writeTo(Sinks.logger());
在這里,我使用StreamSource<ChangeRecord> source
對象作為pipeLine
的輸入。
如您所知,source
對象是流類型。
但在我的例子中,管道數據處理取決于用戶輸入數據(一些元數據)。
如果我在數據庫中進行任何更新或刪除。Jet將更新所有流實例。
因為我的數據處理依賴于用戶數據,所以我不想在第一步之后使用流類型。
只需要流形式的第一個StreamSource<ChangeRecord> source;
。
在下一步中,我只想對批處理流程執行此操作;
那么如何在批處理中使用source
。
pipeLine.readFrom(source)
//始終返回Stream類型。那么如何將其轉換為批處理類型。
我又嘗試了一種方法,如:-
從source
讀取并將所有內容沉入地圖。
pipeLine.readFrom(source).writeTo(Sinks.map("dbStreamedData", e -> e.key(), e -> e.value()));
再次構造管道ReadFrom from map。
pipeline.readFrom(Sources.map("dbStreamedData")).writeTo(Sinks.logger());
這只是返回空數據。
所以任何建議都會很有幫助..
推薦答案
只有當您需要持續更新數據時,使用CDC源才有意義。例如,對數據庫中的每一次更新做出反應,或可能將數據加載到映射中,然后在內存中的快照上以某個時間間隔重復運行批處理作業。
在這種情況下,您可能只希望第一次更新發生在CDC源是最新的之后–在它從數據庫讀取所有當前狀態并且只接收對數據庫進行的更新之后。遺憾的是,目前(Hazelcast 5.0)無法使用Jet API判斷何時發生這種情況。
您可以使用一些特定于域的信息-具有您查詢的時間戳字段、映射中存在上次插入的記錄或類似信息。
如果要對數據庫表中的數據運行單個批處理作業,則應使用JDBC源。
(我之所以使用CDC,是因為如果使用JDBC或其他替代功能將數據加載到緩存中,則需要花費大量時間)
使用CDC有它的開銷,這是我們通常不會看到的。在JDBC源代碼中使用像SELECT * FROM table
這樣的普通SQL查詢比使用CDC源代碼更快。也許你沒有衡量處理整個當前狀態所需的時間?如果使用JDBC加載數據比使用CDC加載數據確實需要更多時間,請向復制者提交問題。
這篇關于Hazelcast噴氣變化數據捕獲的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,