日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

數據實時同步之MongoDB

 

轉載本文需注明出處:微信公眾號EAWorld,違者必究。

?前言:

隨著傳統企業的發展,企業數據呈現多樣化,海量化,難以實現數據快速分析。MongoDB是當前很多企業使用的,當日積月累數據很大時,就可能會忽略歷史數據的價值,可以把數據實時同步到其他儲存:HBASE、HIVE、HDFS文件等等。在當前大數據、云計算的時代潮流下,實現數據價值,對企業決策力、洞察發現力極其有益。

在MongoDB 3.6 之后版本,提供Change Streams API。但目前數據量龐大的仍還是3.6之前版本的歷史悠久企業。這些資產數據是不可缺少的,所以當使用3.6之前版本,兩步走:首先對歷史庫數據遷移。再開始監聽MongoDB庫增量變化,實現MongoDB的監聽和實時同步(Oplog)。

目錄:

1.Oplog簡介

2.MongoDB服務配置啟動

3.Oplog獲取和查看

4.簡單JAVA代碼實現

1.Oplog簡介

1、認識

當在MongoDB的Primary下,我們進行操作庫表時,這些操作會以特殊格式儲存在local庫下的一個固定集合中(下面會介紹到)。Secondary(次)就會通過獲取主的oplog,來進行同步數據,并且存儲自己的Oplog。所以Oplog 也是Mongodb Replication的重要組成了。

2、大小

Mongodb默認將其大小分配的是5%的空閑磁盤空間。也可以在創建 mongod 服務時,在mongo.conf中oplogSize自定義參數設置,單位是mb,如果不指定,不同操作系統上的 oplog 默認大小不同,具體為以下:

For 64-bit linux, Solaris, and FreeBSD systems:可以分配 5% 的剩余空間。如果分配的值仍小于 1GB,那么會分配 1GB。

For 64-bit OS X systems:分配 183MB。

For 32-bit systems:分配 48MB。

oplog的內存占比速度與系統處理寫請求的速度相當,所以很快就會增量更新數據。時間上完全可以支持實時同步。

3、oplog庫表

oplog會自動創建在local庫的collection:

a、master/slave 架構下:local.oplog.$main

數據實時同步之MongoDB

 

b、replica sets 架構下:local.oplog.rs

數據實時同步之MongoDB

 

c、sharding 架構下,mongos下不能查看oplog,可到每一片去

2.MongoDB服務配置啟動

1.解壓當前目錄

tar zxvf mongodb-linux-x86_64-3.2.22.tgz -C ./ mongodb-3.2.22

2.創建data、logs/mongodb.log文件夾

數據實時同步之MongoDB

 

3. bin下創建mongodb.conf自定義配置

數據實時同步之MongoDB

 

4. 創建啟動腳本

start-mongodb.sh,賦權chmod +x start-mongodb.sh

數據實時同步之MongoDB

 

5. 啟動 ./start-mongodb.sh

6.測試

./mongo,默認進入的collections是test,PRIMARY節點

數據實時同步之MongoDB

 


數據實時同步之MongoDB

 

3.oplog獲取和查看

1. oplog數據結構

分析oplog中字段的含義

  • ts: 8字節的時間戳,由4字節unix timestamp + 4字節自增計數表示。這個值很重要,在選舉(如master宕機時)新primary時,會選擇ts最大的那個secondary作為新primary
  • op:1字節的操作類型
  • "i":insert
  • "u":update
  • "d":delete
  • "c":db cmd
  • "db":聲明當前數據庫 (其中ns 被設置成為=>數據庫名稱+ '.')
  • "n": no op,即空操作,其會定期執行以確保時效性
  • ns:操作所在的namespace
  • o:操作所對應的document,即當前操作的內容(比如更新操作時要更新的的字段和值)
  • o2: 在執行更新操作時的where條件,僅限于update時才有該屬性

2. 查看oplog的基本信息

通過"db.printReplicationInfo()"命令可以查看oplog的信息

數據實時同步之MongoDB

 

字段說明:

  • configured oplog size:oplog文件大小
  • log length start to end: oplog日志的啟用時間段
  • oplog first event time: 第一個事務日志的產生時間
  • oplog last event time: 最后一個事務日志的產生時間
  • now: 現在的時間

3、查看oplog日志數據

這里我們一般會重視數據的變化,所以列出insert、update、delete示例

添加一條數據:

db.test.insert({"name":"這是一側測試","age":"18"})

oplog日志數據:

{
    "ts" : Timestamp(1588728789, 1),
    "h" : NumberLong(0),
    "v" : 2,
    "op" : "i",
    "ns" : "runoob.test",
    "o" : {
        "_id" : ObjectId("5eb213d5ce1474899c3a2482"),
        "name" : "這是一側測試",
        "age" : "18"
    }
}

修改:

db.test.update({"_id": ObjectId("5eb213d5ce1474899c3a2482")},{$set:{"name":"這是修改的測試","age":"20"}},false,true)
{
    "ts" : Timestamp(1588730210, 1),
    "h" : NumberLong(0),
    "v" : 2,
    "op" : "u",
    "ns" : "runoob.test",
    "o2" : {
        "_id" : ObjectId("5eb213d5ce1474899c3a2482")
    },
    "o" : {
        "$set" : {
            "name" : "這是修改的測試",
            "age" : "20"
        }
    }
}

刪除:

db.test.remove({"name" : "這是修改的測試"})
{
    "ts" : Timestamp(1588730347, 1),
    "h" : NumberLong(0),
    "v" : 2,
    "op" : "d",
    "ns" : "runoob.test",
    "o" : {
        "_id" : ObjectId("5eb213d5ce1474899c3a2482")
    }
}

4.簡單Java代碼實現

1、maven依賴引入

<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.41</version>
</dependency>
<dependency>
  <groupId>org.mongodb</groupId>
  <artifactId>mongo-java-driver</artifactId>
  <version>3.2.2</version>
</dependency>

2、配置文件resource/mongo-config.properties

connectionsPerHost=10
connectTimeout=10000
cursorFinalizerEnabled=true
maxWaitTime=120000
threadsAllowedToBlockForConnectionMultiplier=5
readSecondary=false
socketTimeout=0
socketKeepAlive=false
write=0
writeTimeout=0
journal=false
hostConfString=127.0.0.1:27017
userName=adminUser
useCollection=admin
password=adminPass

3、MongoDBUtil.java工具類

/**
 * @author wxb
 * @date 2019-10-12 11:26
 */
public class MongoDBUtil {
    private static MongoClient mongoClient;
    private static Properties properties;
    private static WriteConcern concern;
    static {
        try {
            InputStream inputStream = MongoDBUtil.class.getClassLoader().getResourceAsStream("mongo-config.properties");
            properties = new Properties();
            properties.load(inputStream);
            concern = new WriteConcern(Integer.parseInt(properties.getProperty("write")),
                    Integer.parseInt(properties.getProperty("writeTimeout")));
            concern.withJournal(Boolean.valueOf(properties.getProperty("journal")));//讀取journal參數值

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 初始化,返回客戶端
     */
    public static MongoClient initMongoHasUser() throws IOException {
        List<ServerAddress> adds = new ArrayList<>();
        String[] address = properties.getProperty("hostConfString").split(":");//讀取服務IP地址和端口號
        ServerAddress serverAddress = new ServerAddress(address[0], Integer.valueOf(address[1]));
        adds.add(serverAddress);
        List<MongoCredential> credentials = new ArrayList<>();
        MongoCredential mongoCredential = MongoCredential.createScramSha1Credential(
                properties.getProperty("userName"),
                properties.getProperty("useCollection"),
                properties.getProperty("passWord").toCharArray());
        credentials.add(mongoCredential);
        MongoClientOptions options = MongoClientOptions.builder()
                .connectionsPerHost(Integer.parseInt(properties.getProperty("connectionsPerHost")))
                .connectTimeout(Integer.parseInt(properties.getProperty("connectTimeout")))
                .cursorFinalizerEnabled(Boolean.valueOf(properties.getProperty("cursorFinalizerEnabled")))
                .maxWaitTime(Integer.parseInt(properties.getProperty("maxWaitTime")))
                .threadsAllowedToBlockForConnectionMultiplier(Integer.parseInt(properties
                        .getProperty("threadsAllowedToBlockForConnectionMultiplier")))
                .socketTimeout(Integer.valueOf(properties.getProperty("socketTimeout")))
                .socketKeepAlive(Boolean.valueOf(properties.getProperty("socketKeepAlive")))
                .writeConcern(concern)
                .build();
        if (adds.size() > 1){
            mongoClient = new MongoClient(adds, credentials, options);
        }else {
            mongoClient = new MongoClient(adds.get(0), credentials, options);
        }
        return mongoClient;
    }
}

4、MongoDBOpLog.java 集成了庫驗證、表查詢、數據動態獲取

4.1測試初始化客戶端-持有數據庫

    public class MongoDBOpLog {
    private static MongoClient mongoClient;
    public static void main(String[] args) throws InterruptedException {
        initMongoClient();
        //獲取local庫
        MongoDatabase database = getDatabase("local");
        //監控庫oplog.$main
        MongoCollection<Document> runoob = getCollection(database, "oplog.$main");
        //處理
        dataProcessing(runoob);
    }
    private static void initMongoClient() {
        try {
            mongoClient = MongoDBUtil.initMongoHasUser();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static MongoDatabase getDatabase(String dataBase) {
        if (!mongoClient.getDatabaseNames().contains(dataBase)) {
            throw new RuntimeException(dataBase + " no exist !");
        }
        MongoDatabase mongoDatabase = mongoClient.getDatabase(dataBase);
        return mongoDatabase;
    }

4.2獲取表對象

  /**
     * 獲取表對象
          * @param mongoDatabase
     * @param testCollection
     * @return
     */
    public static MongoCollection<Document> getCollection(MongoDatabase mongoDatabase, String testCollection) {
        MongoCollection<Document> collection = null;
        try {
            //獲取數據庫dataBase下的集合collecTion,如果沒有將自動創建
            collection = mongoDatabase.getCollection(testCollection);
        } catch (Exception e) {
            throw new RuntimeException("獲取" + mongoDatabase.getName() + "數據庫下的" + testCollection + "集合 failed !" + e);
        }
        return collection;
    }

4.3獲取數據流處理

    /**
     * 解析操作類型
     * @param op
     * @return
     */
    private static String getEventType(String op) {
        switch (op) {
            case "i":
                return "insert";
            case "u":
                return "update";
            case "d":
                return "delete";
            default:
                return "other";
        }
    }
    /**
     * 數據解析、格式封裝,返回所有insert、update新數據,delete的老數據,做輸出為邏輯刪除,condition字段為空
     * @return JSONObject
     */
    private static JSONObject resultRow(Document document, JSONObject result, String eventType) {
        JSONObject columns = new JSONObject();// 存放變化后的字段
        result.put("columns", columns);
        result.put("condition", new JSONObject()); // 條件
        for (Map.Entry<String, Object> entry : document.entrySet()) {
            if (entry.getKey().equalsIgnoreCase("_id")) {
                columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());
                continue;
            }
            columns.put(entry.getKey(), entry.getValue());
        }
        return result;
    }
      case "d":                return "delete";            default:                return "other";        }    }    /**     * 數據解析、格式封裝,返回所有insert、update新數據,delete的老數據,做輸出為邏輯刪除,condition字段為空     * @return JSONObject     */    private static JSONObject resultRow(Document document, JSONObject result, String eventType) {        JSONObject columns = new JSONObject();// 存放變化后的字段        result.put("columns", columns);        result.put("condition", new JSONObject()); // 條件        for (Map.Entry<String, Object> entry : document.entrySet()) {            if (entry.getKey().equalsIgnoreCase("_id")) {                columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());                continue;            }            columns.put(entry.getKey(), entry.getValue());        }        return result;    }

4.4數據流標準化

    /**
     * 解析操作類型
     * @param op
     * @return
     */
    private static String getEventType(String op) {
        switch (op) {
            case "i":
                return "insert";
            case "u":
                return "update";
            case "d":
                return "delete";
            default:
                return "other";
        }
    }
    /**
     * 數據解析、格式封裝,返回所有insert、update新數據,delete的老數據,做輸出為邏輯刪除,condition字段為空
     * @return JSONObject
     */
    private static JSONObject resultRow(Document document, JSONObject result, String eventType) {
        JSONObject columns = new JSONObject();// 存放變化后的字段
        result.put("columns", columns);
        result.put("condition", new JSONObject()); // 條件
        for (Map.Entry<String, Object> entry : document.entrySet()) {
            if (entry.getKey().equalsIgnoreCase("_id")) {
                columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());
                continue;
            }
            columns.put(entry.getKey(), entry.getValue());
        }
        return result;
    }
      case "d":                return "delete";            default:                return "other";        }    }    /**     * 數據解析、格式封裝,返回所有insert、update新數據,delete的老數據,做輸出為邏輯刪除,condition字段為空     * @return JSONObject     */    private static JSONObject resultRow(Document document, JSONObject result, String eventType) {        JSONObject columns = new JSONObject();// 存放變化后的字段        result.put("columns", columns);        result.put("condition", new JSONObject()); // 條件        for (Map.Entry<String, Object> entry : document.entrySet()) {            if (entry.getKey().equalsIgnoreCase("_id")) {                columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());                continue;            }            columns.put(entry.getKey(), entry.getValue());        }        return result;    }

5、結果

5.1新增

數據實時同步之MongoDB

 

5.2更新

數據實時同步之MongoDB

 

5.3刪除

數據實時同步之MongoDB

 

實踐

目前普元數據服務共享平臺DSP(Data Service Platform),已經集成離線開發和在線開發實現單表和多表同步到HBASE的實踐,做到了這一步,并且對客戶的需求完成交付。

總之,對于當前企業數據庫MongoDB,無論是使用Change Streams,還是Oplog增量同步,實現數據匯聚、搭建數據服務共享平臺,提取價值、長久規劃,都是必不可少的。

關于作者: 雨聲,現任普元高級開發工程師,熟悉軟件開發的大數據、Java、常用消息組件等主流技術,有數據采集、消息推送、數據清洗、實時計算、數據可視化的完整開發經驗。

關于EAWorld:微服務,DevOps,數據治理,移動架構原創技術分享。

分享到:
標簽:實時 同步 數據
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定