日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

canal 是阿里知名的開源項目,主要用途是基于 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費。

這篇文章,我們手把手向同學們展示使用 canal 將 MySQL 增量數據同步到 ES 。

圖片

1 集群模式

圖片

圖中 server 對應一個 canal 運行實例 ,對應一個 JVM 。

server 中包含 1..n 個 instance , 我們可以將 instance 理解為配置任務。

instance 包含如下模塊 :

  • eventParser數據源接入,模擬 slave 協議和 master 進行交互,協議解析
  • eventSinkParser 和 Store 鏈接器,進行數據過濾,加工,分發(fā)的工作
  • eventStore數據存儲
  • metaManager增量訂閱 & 消費信息管理器

真實場景中,canal 高可用依賴 zookeeper ,筆者將客戶端模式可以簡單劃分為:TCP 模式 和 MQ 模式 。

實戰(zhàn)中我們經常會使用 MQ 模式 。因為 MQ 模式的優(yōu)勢在于解耦 ,canal server 將數據變更信息發(fā)送到消息隊列 kafka 或者 RocketMQ ,消費者消費消息,順序執(zhí)行相關邏輯即可。

順序消費:

對于指定的一個 Topic ,所有消息根據 Sharding Key 進行區(qū)塊分區(qū),同一個分區(qū)內的消息按照嚴格的先進先出(FIFO)原則進行發(fā)布和消費。同一分區(qū)內的消息保證順序,不同分區(qū)之間的消息順序不做要求。

圖片

2 MySQL配置

1、對于自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf  中配置如下

[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復

注意:針對阿里云 RDS for MySQL , 默認打開了 binlog , 并且賬號默認具有 binlog dump 權限 , 不需要任何權限或者 binlog 設置,可以直接跳過這一步。

2、授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant 。

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

3、創(chuàng)建數據庫商品表 t_product 。

CREATE TABLE `t_product` (
 `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
 `name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,
 `price` DECIMAL ( 10, 2 ) NOT NULL,
 `status` TINYINT ( 4 ) NOT NULL,
 `create_time` datetime NOT NULL,
 `update_time` datetime NOT NULL,
   PRIMARY KEY ( `id` ) 
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin

3 Elasticsearch配置

使用 Kibana 創(chuàng)建商品索引 。

PUT /t_product
{
    "settings": {
        "number_of_shards": 2,
        "number_of_replicas": 1
    },
    "mAppings": {
            "properties": {
               "id": {
                    "type":"keyword"
                },
                "name": {
                    "type":"text"
                },
                "price": {
                    "type":"double"
                },
                "status": {
                    "type":"integer"
                },
                "createTime": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                },
                "updateTime": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                }
        }
    }
}

執(zhí)行完成,如圖所示 :

圖片

4 RocketMQ 配置

創(chuàng)建主題:product-syn-topic ,canal 會將 Binlog 的變化數據發(fā)送到該主題。

圖片圖片

5 canal 配置

我們選取 canal 版本 1.1.6 ,進入 conf 目錄。

1、配置 canal.properties

#集群模式 zk地址
canal.zkServers = localhost:2181
#本質是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rocketMQ
#instance 列表
canal.destinations = product-syn
#conf root dir
canal.conf.dir = ../conf
#全局的spring配置方式的組件文件 生產環(huán)境,集群化部署
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

######  以下部分是默認值 展示出來 
# Canal的batch size, 默認50K, 由于kafka最大消息體限制請勿超過1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get數據的超時時間, 單位: 毫秒, 空為不限超時
canal.mq.canalGetTimeout = 100
# 是否為 flat json格式對象
canal.mq.flatMessage = true

2、instance 配置文件

在 conf 目錄下創(chuàng)建實例目錄 product-syn  , 在 product-syn 目錄創(chuàng)建配置文件 :instance.properties。

#  按需修改成自己的數據庫信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,數據庫的用戶名和密碼
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...

# table regex 
canal.instance.filter.regex=mytest.t_product

# mq config
canal.mq.topic=product-syn-topic
# 針對庫名或者表名發(fā)送動態(tài)topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..*
canal.mq.partitinotallow=0
# hash partition config
#canal.mq.partitinotallow=3
#庫名.表名: 唯一主鍵,多個表之間用逗號分隔
#canal.mq.partitinotallow=mytest.person:id,mytest.role:id
#################################################

3、服務啟動

啟動兩個 canal 服務,我們從 zookeeper gui 中查看服務運行情況 。

圖片

修改一條 t_product 表記錄,可以從 RocketMQ 控制臺中觀測到新的消息。

圖片

6 消費者

1、產品索引操作服務

圖片

2、消費監(jiān)聽器

圖片

消費者邏輯重點有兩點:

  • 順序消費監(jiān)聽器
  • 將消息數據轉換成  JSON 字符串,從 data 節(jié)點中獲取表最新數據(批量操作可能是多條)。然后根據操作類型 UPDATE、 INSERT、DELETE 執(zhí)行產品索引操作服務的方法。

7 寫到最后

canal 是一個非常有趣的開源項目,很多公司使用 canal 構建數據傳輸服務( Data Transmission Service ,簡稱 DTS ) 。

推薦大家閱讀這個開源項目,你可以從中學習到網絡編程、多線程模型、高性能隊列 Disruptor、 流程模型抽象等。

這篇文章涉及到的代碼已收錄到下面的工程中,有興趣的同學可以一看。

https://Github.com/makemyownlife/rocketmq4-learning

圖片圖片

分享到:
標簽:MySQL
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰(zhàn)2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定