轉載本文需注明出處:微信公眾號EAWorld,違者必究。
?前言:
隨著傳統企業的發展,企業數據呈現多樣化,海量化,難以實現數據快速分析。MongoDB是當前很多企業使用的,當日積月累數據很大時,就可能會忽略歷史數據的價值,可以把數據實時同步到其他儲存:HBASE、HIVE、HDFS文件等等。在當前大數據、云計算的時代潮流下,實現數據價值,對企業決策力、洞察發現力極其有益。
在MongoDB 3.6 之后版本,提供Change Streams API。但目前數據量龐大的仍還是3.6之前版本的歷史悠久企業。這些資產數據是不可缺少的,所以當使用3.6之前版本,兩步走:首先對歷史庫數據遷移。再開始監聽MongoDB庫增量變化,實現MongoDB的監聽和實時同步(Oplog)。
目錄:
1.Oplog簡介
2.MongoDB服務配置啟動
3.Oplog獲取和查看
4.簡單JAVA代碼實現
1.Oplog簡介
1、認識
當在MongoDB的Primary下,我們進行操作庫表時,這些操作會以特殊格式儲存在local庫下的一個固定集合中(下面會介紹到)。Secondary(次)就會通過獲取主的oplog,來進行同步數據,并且存儲自己的Oplog。所以Oplog 也是Mongodb Replication的重要組成了。
2、大小
Mongodb默認將其大小分配的是5%的空閑磁盤空間。也可以在創建 mongod 服務時,在mongo.conf中oplogSize自定義參數設置,單位是mb,如果不指定,不同操作系統上的 oplog 默認大小不同,具體為以下:
For 64-bit linux, Solaris, and FreeBSD systems:可以分配 5% 的剩余空間。如果分配的值仍小于 1GB,那么會分配 1GB。
For 64-bit OS X systems:分配 183MB。
For 32-bit systems:分配 48MB。
oplog的內存占比速度與系統處理寫請求的速度相當,所以很快就會增量更新數據。時間上完全可以支持實時同步。
3、oplog庫表
oplog會自動創建在local庫的collection:
a、master/slave 架構下:local.oplog.$main

b、replica sets 架構下:local.oplog.rs

c、sharding 架構下,mongos下不能查看oplog,可到每一片去
2.MongoDB服務配置啟動
1.解壓當前目錄
tar zxvf mongodb-linux-x86_64-3.2.22.tgz -C ./ mongodb-3.2.22
2.創建data、logs/mongodb.log文件夾

3. bin下創建mongodb.conf自定義配置

4. 創建啟動腳本
start-mongodb.sh,賦權chmod +x start-mongodb.sh

5. 啟動 ./start-mongodb.sh
6.測試
./mongo,默認進入的collections是test,PRIMARY節點


3.oplog獲取和查看
1. oplog數據結構
分析oplog中字段的含義
- ts: 8字節的時間戳,由4字節unix timestamp + 4字節自增計數表示。這個值很重要,在選舉(如master宕機時)新primary時,會選擇ts最大的那個secondary作為新primary
- op:1字節的操作類型
- "i":insert
- "u":update
- "d":delete
- "c":db cmd
- "db":聲明當前數據庫 (其中ns 被設置成為=>數據庫名稱+ '.')
- "n": no op,即空操作,其會定期執行以確保時效性
- ns:操作所在的namespace
- o:操作所對應的document,即當前操作的內容(比如更新操作時要更新的的字段和值)
- o2: 在執行更新操作時的where條件,僅限于update時才有該屬性
2. 查看oplog的基本信息
通過"db.printReplicationInfo()"命令可以查看oplog的信息

字段說明:
- configured oplog size:oplog文件大小
- log length start to end: oplog日志的啟用時間段
- oplog first event time: 第一個事務日志的產生時間
- oplog last event time: 最后一個事務日志的產生時間
- now: 現在的時間
3、查看oplog日志數據
這里我們一般會重視數據的變化,所以列出insert、update、delete示例
添加一條數據:
db.test.insert({"name":"這是一側測試","age":"18"})
oplog日志數據:
{
"ts" : Timestamp(1588728789, 1),
"h" : NumberLong(0),
"v" : 2,
"op" : "i",
"ns" : "runoob.test",
"o" : {
"_id" : ObjectId("5eb213d5ce1474899c3a2482"),
"name" : "這是一側測試",
"age" : "18"
}
}
修改:
db.test.update({"_id": ObjectId("5eb213d5ce1474899c3a2482")},{$set:{"name":"這是修改的測試","age":"20"}},false,true)
{
"ts" : Timestamp(1588730210, 1),
"h" : NumberLong(0),
"v" : 2,
"op" : "u",
"ns" : "runoob.test",
"o2" : {
"_id" : ObjectId("5eb213d5ce1474899c3a2482")
},
"o" : {
"$set" : {
"name" : "這是修改的測試",
"age" : "20"
}
}
}
刪除:
db.test.remove({"name" : "這是修改的測試"})
{
"ts" : Timestamp(1588730347, 1),
"h" : NumberLong(0),
"v" : 2,
"op" : "d",
"ns" : "runoob.test",
"o" : {
"_id" : ObjectId("5eb213d5ce1474899c3a2482")
}
}
4.簡單Java代碼實現
1、maven依賴引入
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.41</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.2.2</version>
</dependency>
2、配置文件resource/mongo-config.properties
connectionsPerHost=10
connectTimeout=10000
cursorFinalizerEnabled=true
maxWaitTime=120000
threadsAllowedToBlockForConnectionMultiplier=5
readSecondary=false
socketTimeout=0
socketKeepAlive=false
write=0
writeTimeout=0
journal=false
hostConfString=127.0.0.1:27017
userName=adminUser
useCollection=admin
password=adminPass
3、MongoDBUtil.java工具類
/**
* @author wxb
* @date 2019-10-12 11:26
*/
public class MongoDBUtil {
private static MongoClient mongoClient;
private static Properties properties;
private static WriteConcern concern;
static {
try {
InputStream inputStream = MongoDBUtil.class.getClassLoader().getResourceAsStream("mongo-config.properties");
properties = new Properties();
properties.load(inputStream);
concern = new WriteConcern(Integer.parseInt(properties.getProperty("write")),
Integer.parseInt(properties.getProperty("writeTimeout")));
concern.withJournal(Boolean.valueOf(properties.getProperty("journal")));//讀取journal參數值
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 初始化,返回客戶端
*/
public static MongoClient initMongoHasUser() throws IOException {
List<ServerAddress> adds = new ArrayList<>();
String[] address = properties.getProperty("hostConfString").split(":");//讀取服務IP地址和端口號
ServerAddress serverAddress = new ServerAddress(address[0], Integer.valueOf(address[1]));
adds.add(serverAddress);
List<MongoCredential> credentials = new ArrayList<>();
MongoCredential mongoCredential = MongoCredential.createScramSha1Credential(
properties.getProperty("userName"),
properties.getProperty("useCollection"),
properties.getProperty("passWord").toCharArray());
credentials.add(mongoCredential);
MongoClientOptions options = MongoClientOptions.builder()
.connectionsPerHost(Integer.parseInt(properties.getProperty("connectionsPerHost")))
.connectTimeout(Integer.parseInt(properties.getProperty("connectTimeout")))
.cursorFinalizerEnabled(Boolean.valueOf(properties.getProperty("cursorFinalizerEnabled")))
.maxWaitTime(Integer.parseInt(properties.getProperty("maxWaitTime")))
.threadsAllowedToBlockForConnectionMultiplier(Integer.parseInt(properties
.getProperty("threadsAllowedToBlockForConnectionMultiplier")))
.socketTimeout(Integer.valueOf(properties.getProperty("socketTimeout")))
.socketKeepAlive(Boolean.valueOf(properties.getProperty("socketKeepAlive")))
.writeConcern(concern)
.build();
if (adds.size() > 1){
mongoClient = new MongoClient(adds, credentials, options);
}else {
mongoClient = new MongoClient(adds.get(0), credentials, options);
}
return mongoClient;
}
}
4、MongoDBOpLog.java 集成了庫驗證、表查詢、數據動態獲取
4.1測試初始化客戶端-持有數據庫
public class MongoDBOpLog {
private static MongoClient mongoClient;
public static void main(String[] args) throws InterruptedException {
initMongoClient();
//獲取local庫
MongoDatabase database = getDatabase("local");
//監控庫oplog.$main
MongoCollection<Document> runoob = getCollection(database, "oplog.$main");
//處理
dataProcessing(runoob);
}
private static void initMongoClient() {
try {
mongoClient = MongoDBUtil.initMongoHasUser();
} catch (IOException e) {
e.printStackTrace();
}
}
public static MongoDatabase getDatabase(String dataBase) {
if (!mongoClient.getDatabaseNames().contains(dataBase)) {
throw new RuntimeException(dataBase + " no exist !");
}
MongoDatabase mongoDatabase = mongoClient.getDatabase(dataBase);
return mongoDatabase;
}
4.2獲取表對象
/**
* 獲取表對象
* @param mongoDatabase
* @param testCollection
* @return
*/
public static MongoCollection<Document> getCollection(MongoDatabase mongoDatabase, String testCollection) {
MongoCollection<Document> collection = null;
try {
//獲取數據庫dataBase下的集合collecTion,如果沒有將自動創建
collection = mongoDatabase.getCollection(testCollection);
} catch (Exception e) {
throw new RuntimeException("獲取" + mongoDatabase.getName() + "數據庫下的" + testCollection + "集合 failed !" + e);
}
return collection;
}
4.3獲取數據流處理
/**
* 解析操作類型
* @param op
* @return
*/
private static String getEventType(String op) {
switch (op) {
case "i":
return "insert";
case "u":
return "update";
case "d":
return "delete";
default:
return "other";
}
}
/**
* 數據解析、格式封裝,返回所有insert、update新數據,delete的老數據,做輸出為邏輯刪除,condition字段為空
* @return JSONObject
*/
private static JSONObject resultRow(Document document, JSONObject result, String eventType) {
JSONObject columns = new JSONObject();// 存放變化后的字段
result.put("columns", columns);
result.put("condition", new JSONObject()); // 條件
for (Map.Entry<String, Object> entry : document.entrySet()) {
if (entry.getKey().equalsIgnoreCase("_id")) {
columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());
continue;
}
columns.put(entry.getKey(), entry.getValue());
}
return result;
}
case "d": return "delete"; default: return "other"; } } /** * 數據解析、格式封裝,返回所有insert、update新數據,delete的老數據,做輸出為邏輯刪除,condition字段為空 * @return JSONObject */ private static JSONObject resultRow(Document document, JSONObject result, String eventType) { JSONObject columns = new JSONObject();// 存放變化后的字段 result.put("columns", columns); result.put("condition", new JSONObject()); // 條件 for (Map.Entry<String, Object> entry : document.entrySet()) { if (entry.getKey().equalsIgnoreCase("_id")) { columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString()); continue; } columns.put(entry.getKey(), entry.getValue()); } return result; }
4.4數據流標準化
/**
* 解析操作類型
* @param op
* @return
*/
private static String getEventType(String op) {
switch (op) {
case "i":
return "insert";
case "u":
return "update";
case "d":
return "delete";
default:
return "other";
}
}
/**
* 數據解析、格式封裝,返回所有insert、update新數據,delete的老數據,做輸出為邏輯刪除,condition字段為空
* @return JSONObject
*/
private static JSONObject resultRow(Document document, JSONObject result, String eventType) {
JSONObject columns = new JSONObject();// 存放變化后的字段
result.put("columns", columns);
result.put("condition", new JSONObject()); // 條件
for (Map.Entry<String, Object> entry : document.entrySet()) {
if (entry.getKey().equalsIgnoreCase("_id")) {
columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());
continue;
}
columns.put(entry.getKey(), entry.getValue());
}
return result;
}
case "d": return "delete"; default: return "other"; } } /** * 數據解析、格式封裝,返回所有insert、update新數據,delete的老數據,做輸出為邏輯刪除,condition字段為空 * @return JSONObject */ private static JSONObject resultRow(Document document, JSONObject result, String eventType) { JSONObject columns = new JSONObject();// 存放變化后的字段 result.put("columns", columns); result.put("condition", new JSONObject()); // 條件 for (Map.Entry<String, Object> entry : document.entrySet()) { if (entry.getKey().equalsIgnoreCase("_id")) { columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString()); continue; } columns.put(entry.getKey(), entry.getValue()); } return result; }
5、結果
5.1新增

5.2更新

5.3刪除

實踐
目前普元數據服務共享平臺DSP(Data Service Platform),已經集成離線開發和在線開發實現單表和多表同步到HBASE的實踐,做到了這一步,并且對客戶的需求完成交付。
總之,對于當前企業數據庫MongoDB,無論是使用Change Streams,還是Oplog增量同步,實現數據匯聚、搭建數據服務共享平臺,提取價值、長久規劃,都是必不可少的。
關于作者: 雨聲,現任普元高級開發工程師,熟悉軟件開發的大數據、Java、常用消息組件等主流技術,有數據采集、消息推送、數據清洗、實時計算、數據可視化的完整開發經驗。
關于EAWorld:微服務,DevOps,數據治理,移動架構原創技術分享。