canal 是阿里知名的開源項目,主要用途是基于 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費。
這篇文章,我們手把手向同學們展示使用 canal 將 MySQL 增量數據同步到 ES 。
1 集群模式
圖中 server 對應一個 canal 運行實例 ,對應一個 JVM 。
server 中包含 1..n 個 instance , 我們可以將 instance 理解為配置任務。
instance 包含如下模塊 :
- eventParser數據源接入,模擬 slave 協議和 master 進行交互,協議解析
- eventSinkParser 和 Store 鏈接器,進行數據過濾,加工,分發(fā)的工作
- eventStore數據存儲
- metaManager增量訂閱 & 消費信息管理器
真實場景中,canal 高可用依賴 zookeeper ,筆者將客戶端模式可以簡單劃分為:TCP 模式 和 MQ 模式 。
實戰(zhàn)中我們經常會使用 MQ 模式 。因為 MQ 模式的優(yōu)勢在于解耦 ,canal server 將數據變更信息發(fā)送到消息隊列 kafka 或者 RocketMQ ,消費者消費消息,順序執(zhí)行相關邏輯即可。
順序消費:
對于指定的一個 Topic ,所有消息根據 Sharding Key 進行區(qū)塊分區(qū),同一個分區(qū)內的消息按照嚴格的先進先出(FIFO)原則進行發(fā)布和消費。同一分區(qū)內的消息保證順序,不同分區(qū)之間的消息順序不做要求。
2 MySQL配置
1、對于自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
注意:針對阿里云 RDS for MySQL , 默認打開了 binlog , 并且賬號默認具有 binlog dump 權限 , 不需要任何權限或者 binlog 設置,可以直接跳過這一步。
2、授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant 。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3、創(chuàng)建數據庫商品表 t_product 。
CREATE TABLE `t_product` (
`id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
`name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,
`price` DECIMAL ( 10, 2 ) NOT NULL,
`status` TINYINT ( 4 ) NOT NULL,
`create_time` datetime NOT NULL,
`update_time` datetime NOT NULL,
PRIMARY KEY ( `id` )
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin
3 Elasticsearch配置
使用 Kibana 創(chuàng)建商品索引 。
PUT /t_product
{
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
},
"mAppings": {
"properties": {
"id": {
"type":"keyword"
},
"name": {
"type":"text"
},
"price": {
"type":"double"
},
"status": {
"type":"integer"
},
"createTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"updateTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}
執(zhí)行完成,如圖所示 :
4 RocketMQ 配置
創(chuàng)建主題:product-syn-topic ,canal 會將 Binlog 的變化數據發(fā)送到該主題。
5 canal 配置
我們選取 canal 版本 1.1.6 ,進入 conf 目錄。
1、配置 canal.properties
#集群模式 zk地址
canal.zkServers = localhost:2181
#本質是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rocketMQ
#instance 列表
canal.destinations = product-syn
#conf root dir
canal.conf.dir = ../conf
#全局的spring配置方式的組件文件 生產環(huán)境,集群化部署
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
###### 以下部分是默認值 展示出來
# Canal的batch size, 默認50K, 由于kafka最大消息體限制請勿超過1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get數據的超時時間, 單位: 毫秒, 空為不限超時
canal.mq.canalGetTimeout = 100
# 是否為 flat json格式對象
canal.mq.flatMessage = true
2、instance 配置文件
在 conf 目錄下創(chuàng)建實例目錄 product-syn , 在 product-syn 目錄創(chuàng)建配置文件 :instance.properties。
# 按需修改成自己的數據庫信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,數據庫的用戶名和密碼
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# table regex
canal.instance.filter.regex=mytest.t_product
# mq config
canal.mq.topic=product-syn-topic
# 針對庫名或者表名發(fā)送動態(tài)topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..*
canal.mq.partitinotallow=0
# hash partition config
#canal.mq.partitinotallow=3
#庫名.表名: 唯一主鍵,多個表之間用逗號分隔
#canal.mq.partitinotallow=mytest.person:id,mytest.role:id
#################################################
3、服務啟動
啟動兩個 canal 服務,我們從 zookeeper gui 中查看服務運行情況 。
修改一條 t_product 表記錄,可以從 RocketMQ 控制臺中觀測到新的消息。
6 消費者
1、產品索引操作服務
2、消費監(jiān)聽器
消費者邏輯重點有兩點:
- 順序消費監(jiān)聽器
- 將消息數據轉換成 JSON 字符串,從 data 節(jié)點中獲取表最新數據(批量操作可能是多條)。然后根據操作類型 UPDATE、 INSERT、DELETE 執(zhí)行產品索引操作服務的方法。
7 寫到最后
canal 是一個非常有趣的開源項目,很多公司使用 canal 構建數據傳輸服務( Data Transmission Service ,簡稱 DTS ) 。
推薦大家閱讀這個開源項目,你可以從中學習到網絡編程、多線程模型、高性能隊列 Disruptor、 流程模型抽象等。
這篇文章涉及到的代碼已收錄到下面的工程中,有興趣的同學可以一看。
https://Github.com/makemyownlife/rocketmq4-learning
圖片