業(yè)務(wù)痛點
離開了業(yè)務(wù)談技術(shù)都是耍流氓。我們來聊聊基于業(yè)務(wù)的痛點,衍生出來的多種數(shù)據(jù)同步方案。
業(yè)務(wù)中常見的需要數(shù)據(jù)同步的場景
1、多個庫的表合并到一張表。不同的業(yè)務(wù)線或者微服務(wù)在不同的數(shù)據(jù)庫里開發(fā),但是此時有些報表需要將多個庫的類似的數(shù)據(jù)合并后做查詢統(tǒng)計。或者,某些歷史原因,類似剛開始的商業(yè)模式不清晰,導(dǎo)致一些業(yè)務(wù)線分分合合。或者某些邊緣業(yè)務(wù)逐步融合到了主業(yè)務(wù)。早起的數(shù)據(jù)是分開的,業(yè)務(wù)運營也是分開,后來又合并成了一個大塊業(yè)務(wù)。
2、某個數(shù)據(jù)需要寫到多個存儲中。業(yè)務(wù)數(shù)據(jù)需要寫入到多個中間件或者存儲中,比如業(yè)務(wù)的數(shù)據(jù)存儲再MySQL的數(shù)據(jù)中,后來為了方便檢索需要寫入到ES,或者為了緩存需要寫入到redis,或者是Mysql分表的數(shù)據(jù)合并寫入到Doris中。
3、數(shù)據(jù)倉庫的場景。比如將表里的數(shù)據(jù)實時寫入到DWS數(shù)據(jù)倉庫的寬表中。
4、應(yīng)急場景。如果不采專用CDC的方案,那么要達(dá)到實時查詢的效果,只能在BFF層的代碼調(diào)用多個中心層的查詢API,然后再BFF層做各種聚合,運算。這種方式開發(fā)效率低下,萬一有的中心層沒有提供合適的查詢API,臨時開發(fā)的話,會讓開發(fā)進(jìn)度不可控。
總之,不管是數(shù)據(jù)多寫、還是多表合并、還是建立數(shù)據(jù)倉庫,都屬于數(shù)據(jù)同步任務(wù)。
數(shù)據(jù)同步為什么需要獨立的系統(tǒng)來做
這種任務(wù)放在業(yè)務(wù)代碼里做,是不可持續(xù)的。你要盡量讓業(yè)務(wù)系統(tǒng)解耦,專注于做業(yè)務(wù)的事情,這種數(shù)據(jù)同步的任務(wù)應(yīng)該交給專門的系統(tǒng)來做。如果在業(yè)務(wù)系統(tǒng)中增加額外的數(shù)據(jù)同步功能,同時為了提高數(shù)據(jù)同步的可用性,就需要寫許多數(shù)據(jù)同步的代碼和容錯的代碼(效率問題、并發(fā)問題、數(shù)據(jù)一致性問題、集群問題等等),這會讓業(yè)務(wù)系統(tǒng)不堪重負(fù),到后期業(yè)務(wù)系統(tǒng)幾乎會達(dá)到不可維護(hù)的地步。
CDC登場
基于以上問題,本場數(shù)據(jù)同步的主角FlinkCDC就登場了,F(xiàn)linkCDC是專門為數(shù)據(jù)同步(同步+計算)而生。通過CDC工具,可以將數(shù)據(jù)同步任務(wù)從業(yè)務(wù)系統(tǒng)中解耦出來,同時還可以將一份變動的數(shù)據(jù),寫入到多個存儲中。這種方式不但讓業(yè)務(wù)系統(tǒng)解耦,而且可以讓數(shù)據(jù)同步任務(wù)更加健壯,方便后續(xù)的維護(hù)。
CDC原理
CDC是什么
CDC 是變更數(shù)據(jù)捕獲(Change Data Capture)技術(shù)的縮寫,它可以將源數(shù)據(jù)庫(Source)的增量變動記錄,同步到一個或多個數(shù)據(jù)目的(Sink)。在同步過程中,還可以對數(shù)據(jù)進(jìn)行一定的處理,例如過濾、關(guān)聯(lián)、分組、統(tǒng)計等。
目前專業(yè)做數(shù)據(jù)庫事件接受和解析的中間件是Debezium,如果是捕獲Mysql,還有Canal。
Debezium官方https://debezium.io/
Debezium官方定義:Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your Apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong。翻譯過來則是:Debezium 是一個用于變更數(shù)據(jù)捕獲的開源分布式平臺。 啟動它,將其指向您的數(shù)據(jù)庫,您的應(yīng)用程序就可以開始響應(yīng)其他應(yīng)用程序提交給您的數(shù)據(jù)庫的所有插入、更新和刪除操作。 Debezium 耐用且快速,因此即使出現(xiàn)問題,您的應(yīng)用程序也可以快速響應(yīng)并且不會錯過任何事件。
CDC原理
CDC的原理是,當(dāng)數(shù)據(jù)源表發(fā)生變動時,會通過附加在表上的觸發(fā)器或者 binlog 等途徑,將操作記錄下來。下游可以通過數(shù)據(jù)庫底層的協(xié)議,訂閱并消費這些事件,然后對數(shù)據(jù)庫變動記錄做重放,從而實現(xiàn)同步。這種方式的優(yōu)點是實時性高,可以精確捕捉上游的各種變動。
FlinkCDC
FlinkCDC是什么
官網(wǎng)地址:https://ververica.Github.io/flink-cdc-connectors/
官方定義:This project provides a set of source connectors for Apache Flink® directly ingesting changes coming from different databases using Change Data Capture(CDC)。根據(jù)FlinkCDC官方給出的定義,F(xiàn)linkCDC提供一組源數(shù)據(jù)的連接器,使用變更數(shù)據(jù)捕獲的方式,直接吸收來自不同數(shù)據(jù)庫的變更數(shù)據(jù)。
為什么是FlinkCDC
1、FlinkCDC 提供了對 Debezium 連接器的封裝和集成,簡化了配置和使用的過程,并提供了更高級的 API 和功能,例如數(shù)據(jù)格式轉(zhuǎn)換、事件時間處理等。Flink CDC 使用 Debezium 連接器作為底層的實現(xiàn),將其與 Flink 的數(shù)據(jù)處理能力結(jié)合起來。通過配置和使用 Flink CDC,您可以輕松地將數(shù)據(jù)庫中的變化數(shù)據(jù)流轉(zhuǎn)化為 Flink 的 DataStream 或 Table,并進(jìn)行實時的數(shù)據(jù)處理、轉(zhuǎn)換和分析。
2、Flink的DataStream和SQL比較成熟和易用
3、Flink支持狀態(tài)后端(State Backends),允許存儲海量的數(shù)據(jù)狀態(tài)
4、Flink有更好的生態(tài),更多的Source和Sink的支持
數(shù)據(jù)流向?qū)Ρ?/h3>
數(shù)據(jù)合并流向:
數(shù)據(jù)多寫流向:
技術(shù)方案比較
網(wǎng)上有數(shù)據(jù)同步的多種技術(shù)方案的比較,我只挑選我實踐過的2種做個比較,Canal和FlinkCDC。
數(shù)據(jù)鏈路對比
通過下圖,我們可以看到Canal處理數(shù)據(jù)的鏈路比FlinkCDC更長,數(shù)據(jù)鏈路一旦變長意味著,出錯的可能性更高。
我在實踐Canal的過程中,監(jiān)聽到Kafka之后,通過一個Springboot項目的微服務(wù)項目去監(jiān)聽Kafka處理業(yè)務(wù)邏輯,這種負(fù)責(zé)度更高,內(nèi)部數(shù)據(jù)關(guān)聯(lián)啥的也是調(diào)用Dubbo API,我不建議你也使用這種方法。當(dāng)然啦,這是我沒遇到Flink之前的方案,嘻嘻。當(dāng)然還用過更差的方案,定時任務(wù)掃描,再寫入別的庫,哈哈。
變更數(shù)據(jù)的結(jié)構(gòu)
Mysql單次提交多條數(shù)據(jù)的時候,Canal拿到的數(shù)據(jù)是1條數(shù)據(jù),F(xiàn)linkCDC拿到的是多條數(shù)據(jù)。FlinkCDC的這種方式更便于處理。
canal數(shù)據(jù)格式:
{"data":[{"id":"G00002","name":"潞城市小螞蟻家政保潔有限公司","province_id":"32","province":"江蘇省","city_id":"3201","city":"南京市","district_id":"320114","district":"雨花臺區(qū)","address":"科創(chuàng)城23222333444","logo_url":"http://bm-oss.oss-cn-hangzhou.aliyuncs.com/jfe-app3.0/baba/goods/icon_112.png","slogan":"歡迎來到","credit_code":"2343243","master_name":"江鑫","master_idcard":"532524199911304246","power_group_id":"9996","opt_time":"2021-10-19 17:41:57","add_user_id":"132","add_user_name":"測試","add_time":"2020-07-27 13:59:34","emAIl":"123456@qq.com","master_wechat":null,"service_phone":"13232323232","max_shop_num":"5","pay_mode":null,"business_license":"https://guard.bm001.com/kacloud/null/image/MXVSeb7Pv4r1f1346237770562140.jpg","idcard_front":"https://guard.bm001.com/kacloud/null/image/13uty9yyvrlvWb346237808202139.png","idcard_back":"https://guard.bm001.com/kacloud/null/image/5uA7IblI6r1zoW346237778042125.png","cloud_shop_state":"0","expiration_time":null,"version_id":null,"company_type":"0","company_property":"1","main_sell":null,"introduction":null,"contact_name":null,"contact_phone":null,"certification_name":"潞城市小螞蟻家政保潔有限公司","is_test":null,"login_account":null,"delete_at":"0","self_invitation_code":"IN6501"}],"database":"cloud_test","es":1669010586000,"id":8150,"isDdl":false,"mysqlType":{"id":"varchar(32)","name":"varchar(64)","province_id":"int(6)","province":"varchar(32)","city_id":"int(6)","city":"varchar(32)","district_id":"int(6)","district":"varchar(64)","address":"varchar(128)","logo_url":"varchar(500)","slogan":"varchar(255)","credit_code":"varchar(18)","master_name":"varchar(16)","master_idcard":"varchar(18)","power_group_id":"bigint(20)","opt_time":"datetime","add_user_id":"varchar(32)","add_user_name":"varchar(32)","add_time":"datetime","email":"varchar(255)","master_wechat":"varchar(255)","service_phone":"varchar(32)","max_shop_num":"int(11)","pay_mode":"int(1)","business_license":"varchar(128)","idcard_front":"varchar(128)","idcard_back":"varchar(128)","cloud_shop_state":"int(1)","expiration_time":"datetime","version_id":"bigint(20)","company_type":"tinyint(2)","company_property":"int(2)","main_sell":"varchar(200)","introduction":"varchar(512)","contact_name":"varchar(32)","contact_phone":"varchar(11)","certification_name":"varchar(64)","is_test":"int(1)","login_account":"varchar(50)","delete_at":"bigint(14)","self_invitation_code":"char(6)"},"old":[{"address":"科創(chuàng)城23222333"}],"pkNames":["id"],"sql":"","sqlType":{"id":12,"name":12,"province_id":4,"province":12,"city_id":4,"city":12,"district_id":4,"district":12,"address":12,"logo_url":12,"slogan":12,"credit_code":12,"master_name":12,"master_idcard":12,"power_group_id":-5,"opt_time":93,"add_user_id":12,"add_user_name":12,"add_time":93,"email":12,"master_wechat":12,"service_phone":12,"max_shop_num":4,"pay_mode":4,"business_license":12,"idcard_front":12,"idcard_back":12,"cloud_shop_state":4,"expiration_time":93,"version_id":-5,"company_type":-6,"company_property":4,"main_sell":12,"introduction":12,"contact_name":12,"contact_phone":12,"certification_name":12,"is_test":4,"login_account":12,"delete_at":-5,"self_invitation_code":1},"table":"uc_company","ts":1669010468134,"type":"UPDATE"}
FlinkCDC數(shù)據(jù)格式:
{"before":{"id":"PF1784570096901248","pay_order_no":null,"out_no":"J1784570080435328","title":"充值辦卡","from_user_id":"PG11111","from_account_id":"1286009802396288","user_id":"BO1707796995184000","account_id":"1707895210106496","amount":13400,"profit_state":1,"profit_time":1686758315000,"refund_state":0,"refund_time":null,"add_time":1686758315000,"remark":"充值辦卡","acct_circle":"PG11111","user_type":92,"from_user_type":90,"company_id":"PG11111","profit_mode":1,"type":2,"parent_id":null,"oc_profit_id":"1784570096901248","keep_account_from_user_id":null,"keep_account_from_bm_user_id":null,"keep_account_user_id":null,"keep_account_bm_user_id":null,"biz_company_id":"PG11111"},"after":{"id":"PF1784570096901248","pay_order_no":null,"out_no":"J1784570080435328","title":"充值辦卡","from_user_id":"PG11111","from_account_id":"1286009802396288","user_id":"BO1707796995184000","account_id":"1707895210106496","amount":13400,"profit_state":1,"profit_time":1686758315000,"refund_state":0,"refund_time":null,"add_time":1686758315000,"remark":"充值辦卡1","acct_circle":"PG11111","user_type":92,"from_user_type":90,"company_id":"PG11111","profit_mode":1,"type":2,"parent_id":null,"oc_profit_id":"1784570096901248","keep_account_from_user_id":null,"keep_account_from_bm_user_id":null,"keep_account_user_id":null,"keep_account_bm_user_id":null,"biz_company_id":"PG11111"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1686734882000,"snapshot":"false","db":"cloud_test","sequence":null,"table":"acct_profit","server_id":1,"gtid":null,"file":"mysql-bin.000514","pos":650576218,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1686734882689,"transaction":null}
如何使用
FlinkCDC同步數(shù)據(jù),有兩種方式,一種是FlinkSQL的方式,一種是Flink DataStream和Table API的方式。為了方便管理,這兩種方式我都寫在代碼里。
前置準(zhǔn)備
1、準(zhǔn)備好Flink集群。FlinkCDC也是以任務(wù)的形式提交到Flink集群去執(zhí)行的。可以按照Flink官網(wǎng)進(jìn)行下載安裝:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/try-flink/local_installation/
2、開啟Mysql的binlog。這一步自行解決。
FlinkSQL方式
為了方便管理,F(xiàn)linkSQL方式也是用JAVA代碼寫
1、創(chuàng)建database
tEnv.executeSql("CREATE DATABASE IF NOT EXISTS cloud_test");
tEnv.executeSql("CREATE DATABASE IF NOT EXISTS league_test");
2、創(chuàng)建source表
注意類型是'connector' = 'mysql-cdc'
。
tEnv.executeSql("CREATE TABLE league_test.oc_settle_profit (n" +
" id STRING,n" +
" show_profit_id STRING,n" +
" order_no STRING,n" +
" from_user_id STRING,n" +
" from_user_type INT,n" +
" user_id STRING,n" +
" user_type INT,n" +
" rate INT,n" +
" amount INT,n" +
" type INT,n" +
" add_time TIMESTAMP,n" +
" state INT,n" +
" expect_profit_time TIMESTAMP,n" +
" profit_time TIMESTAMP,n" +
" profit_mode INT,n" +
" opt_code STRING,n" +
" opt_name STRING,n" +
" acct_circle STRING,n" +
" process_state INT,n" +
" parent_id STRING,n" +
" keep_account_from_user_id STRING,n" +
" keep_account_from_bm_user_id STRING,n" +
" keep_account_user_id STRING,n" +
" keep_account_bm_user_id STRING,n" +
" biz_type INT,n" +
" remark STRING,n" +
" contribute_user_id STRING,n" +
" relation_brand_owner_id STRING,n" +
" PRIMARY KEY (id) NOT ENFORCEDn" +
") WITH (n" +
" 'connector' = 'mysql-cdc',n" +
" 'hostname' = '10.20.1.11',n" +
" 'port' = '3306',n" +
" 'username' = 'root',n" +
" 'password' = '123456',n" +
" 'database-name' = 'league_test',n" +
" 'table-name' = 'oc_settle_profit',n" +
" 'scan.incremental.snapshot.enabled' = 'false'n" +
")");
3、創(chuàng)建sink表
注意類型是'connector' = 'jdbc'
。
tEnv.executeSql("CREATE TABLE cloud_test.dws_profit_record_hdj_flink (n" +
" id STRING,n" +
" show_profit_id STRING,n" +
" order_no STRING,n" +
" from_user_id STRING,n" +
" from_user_type INT,n" +
" user_id STRING,n" +
" user_type INT,n" +
" amount INT,n" +
" profit_time TIMESTAMP,n" +
" state INT,n" +
" acct_circle STRING,n" +
" biz_type INT,n" +
" contribute_user_id STRING,n" +
" relation_brand_owner_id STRING,n" +
" remark STRING,n" +
" add_time TIMESTAMP,n" +
" PRIMARY KEY (id) NOT ENFORCEDn" +
") WITH (n" +
" 'connector' = 'jdbc',n" +
" 'url' = 'jdbc:mysql://10.20.1.11:3306/cloud_test',n" +
" 'username' = 'root',n" +
" 'password' = 'root12345',n" +
" 'table-name' = 'dws_profit_record_hdj_flink'n" +
")");
4、執(zhí)行insert。
如果需要多表關(guān)聯(lián)的,可以注冊多個'connector' = 'jdbc'
的源表,然后這里編寫類似insert into select join
這樣代碼
tEnv.executeSql("INSERT INTO cloud_test.dws_profit_record_hdj_flink (id, show_profit_id, order_no, from_user_id, from_user_type, user_id,n" +
" user_type, amount, profit_time, state, acct_circle, biz_type,n" +
" contribute_user_id, relation_brand_owner_id, remark, add_time)n" +
"select f.id,n" +
" f.show_profit_id,n" +
" f.order_no,n" +
" f.from_user_id,n" +
" f.from_user_type,n" +
" f.user_id,n" +
" f.user_type,n" +
" f.amount,n" +
" f.profit_time,n" +
" f.state,n" +
" f.acct_circle,n" +
" f.biz_type,n" +
" f.contribute_user_id,n" +
" f.relation_brand_owner_id,n" +
" f.remark,n" +
" f.add_timen" +
"from league_test.oc_settle_profit fn" +
"where f.id is not nulln" +
" and f.biz_type is not nulln" +
" and f.biz_type = 9");
FlinkSQL方式結(jié)束,此時只要source表有變動,那么會自動監(jiān)聽到數(shù)據(jù),自動插入到新的表中。
DataStream和Table API方式
個人覺得這種方式雖說有些繁瑣,但是靈活度更好,可以用Java代碼處理很多邏輯,比SQL更靈活些。
1、監(jiān)聽source
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(MYSQL_HOST)
.port(MYSQL_PORT)
.databaseList(SYNC_DB) // set captured database
.tableList(String.join(",", SYNC_TABLES)) // set captured table
.username(MYSQL_USER)
.password(MYSQL_PASSWD)
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(5000);
DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source" + LeagueOcSettleProfit2DwsHdjProfitRecordAPI.class.getName());
2、清洗數(shù)據(jù)(過濾、轉(zhuǎn)換等等)
此處邏輯比較自定義,文中是過濾掉了不相關(guān)的表,然后過濾掉了刪除數(shù)據(jù)的log。
過濾掉不相關(guān)的表。
private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {
return source.filter(new FilterFunction<String>() {
@Override
public boolean filter(String row) throws Exception {
try {
JSONObject rowJson = JSON.parseobject(row);
JSONObject source = rowJson.getJSONObject("source");
String tbl = source.getString("table");
return table.equals(tbl);
} catch (Exception ex) {
ex.printStackTrace();
return false;
}
}
});
}
過濾掉刪除數(shù)據(jù)的log
private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {
return source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String row, Collector<String> out) throws Exception {
try {
LOG.info("============================row:{}", row);
JSONObject rowJson = JSON.parseObject(row);
String op = rowJson.getString("op");
//history,insert,update
if (Arrays.asList("r", "c", "u").contains(op)) {
out.collect(rowJson.getJSONObject("after").toJSONString());
} else {
LOG.info("filter other op:{}", op);
}
} catch (Exception ex) {
LOG.warn("filter other format binlog:{}", row);
}
}
});
}
處理業(yè)務(wù)邏輯,過濾掉了部分?jǐn)?shù)據(jù)
private static SingleOutputStreamOperator<String> logic(SingleOutputStreamOperator<String> cleanStream) {
return cleanStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String data) throws Exception {
try {
JSONObject dataJson = JSON.parseObject(data);
String id = dataJson.getString("id");
Integer bizType = dataJson.getInteger("biz_type");
if (StringUtils.isBlank(id) || bizType == null) {
return false;
}
// 只處理上崗卡數(shù)據(jù)
return bizType == 9;
} catch (Exception ex) {
LOG.warn("filter other format binlog:{}", data);
return false;
}
}
});
}
3、創(chuàng)建自定義sink,將數(shù)據(jù)寫出去
private static class CustomDealDataSink extends RichSinkFunction<String> {
private transient Connection cloudConnection;
private transient PreparedStatement cloudPreparedStatement;
private String insertSql = "INSERT INTO dws_profit_record_hdj_flink_api (id, show_profit_id, order_no, from_user_id, from_user_type, user_id,n" +
" user_type, amount, profit_time, state, acct_circle, biz_type,n" +
" contribute_user_id, relation_brand_owner_id, remark, add_time)n" +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?,n" +
" ?, ?, ?, ?, ?, ?, ?, ?)";
private String deleteSql = "delete from dws_profit_record_hdj_flink_api where id = '%s'";
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 在這里初始化 JDBC 連接
cloudConnection = DriverManager.getConnection("jdbc:mysql://10.20.1.11:3306/cloud_test", "root", "123456");
cloudPreparedStatement = cloudConnection.prepareStatement(insertSql);
}
@Override
public void invoke(String value, Context context) throws Exception {
JSONObject dataJson = JSON.parseObject(value);
String id = dataJson.getString("id");
String showProfitId = dataJson.getString("show_profit_id");
String orderNo = dataJson.getString("order_no");
String fromUserId = dataJson.getString("from_user_id");
Integer fromUserType = dataJson.getInteger("from_user_type");
String userId = dataJson.getString("user_id");
Integer userType = dataJson.getInteger("user_type");
Integer amount = dataJson.getInteger("amount");
Timestamp addTime = dataJson.getTimestamp("add_time");
Integer state = dataJson.getInteger("state");
Timestamp profitTime = dataJson.getTimestamp("profit_time");
String acctCircle = dataJson.getString("acct_circle");
Integer bizType = dataJson.getInteger("biz_type");
String remark = dataJson.getString("remark");
String contributeUserId = dataJson.getString("contribute_user_id");
String relationBrandOwnerId = dataJson.getString("relation_brand_owner_id");
Timestamp profitTimeTimestamp = Timestamp.valueOf(DateFormatUtils.format(profitTime.getTime(), "yyyy-MM-dd HH:mm:ss", TimeZone.getTimeZone("GMT")));
Timestamp addTimeTimestamp = Timestamp.valueOf(DateFormatUtils.format(addTime.getTime(), "yyyy-MM-dd HH:mm:ss", TimeZone.getTimeZone("GMT")));
cloudPreparedStatement.setString(1, id);
cloudPreparedStatement.setString(2, showProfitId);
cloudPreparedStatement.setString(3, orderNo);
cloudPreparedStatement.setString(4, fromUserId);
cloudPreparedStatement.setInt(5, fromUserType);
cloudPreparedStatement.setString(6, userId);
cloudPreparedStatement.setInt(7, userType);
cloudPreparedStatement.setInt(8, amount);
cloudPreparedStatement.setTimestamp(9, profitTimeTimestamp);
cloudPreparedStatement.setInt(10, state);
cloudPreparedStatement.setString(11, StringUtils.isBlank(acctCircle) ? "PG11111" : acctCircle);
cloudPreparedStatement.setInt(12, bizType);
cloudPreparedStatement.setString(13, contributeUserId);
cloudPreparedStatement.setString(14, relationBrandOwnerId);
cloudPreparedStatement.setString(15, remark);
cloudPreparedStatement.setTimestamp(16, addTimeTimestamp);
cloudPreparedStatement.execute(String.format(deleteSql, id));
cloudPreparedStatement.execute();
}
@Override
public void close() throws Exception {
super.close();
// 在這里關(guān)閉 JDBC 連接
cloudPreparedStatement.close();
cloudConnection.close();
}
}
代碼地址
代碼里有2個夾子,一個是API方式的,一個是SQL方式的,每種方式了放了2個例子,代碼地址如下:https://github.com/yclxiao/flink-cdc-demo.git
如果在實踐的過程中碰到問題,可以在這里找到我:http://www.mangod.top/articles/2023/03/15/1678849930601.html