MQTT**(**消息隊列遙測傳輸)是ISO 標準(ISO/IEC PRF 20922)下基于發布/訂閱范式的消息協議。它工作在TCP/IP協議族上,是為硬件性能低下的遠程設備以及網絡狀況糟糕的情況下而設計的發布/訂閱型消息協議,為此,它需要一個消息中間件。
一、前景摘要
- DES版本:DevEco Studio 3.0 Release
- SDK版本:3.2.2.5 ( API9)
- npm版本:6.14.16
- EMQX:linux(Ubuntu)
- MQTTX:Version: v1.9.2
二、了解MQTT
1、什么是MQTT?
MQTT**(**消息隊列遙測傳輸)是ISO 標準(ISO/IEC PRF 20922)下基于發布/訂閱范式的消息協議。它工作在TCP/IP協議族上,是為硬件性能低下的遠程設備以及網絡狀況糟糕的情況下而設計的發布/訂閱型消息協議,為此,它需要一個消息中間件。
2、MQTT特性
MQTT協議是為大量計算能力有限,且工作在低帶寬、不可靠的網絡的遠程傳感器和控制設備通訊而設計的協議,它具有以下主要的幾項特性:
使用發布/訂閱消息模式,提供一對多的消息發布,解除應用程序耦合;
- 對負載內容屏蔽的消息傳輸。
- 使用 TCP/IP 提供網絡連接。
- 有三種消息發布服務質量:QoS(定閱等級),分0、1、2三個等級,簡單來說是等級越高越可靠。
- 小型傳輸,開銷很小(固定長度的頭部是 2 字節),協議交換最小化,以降低網絡流量。
- 使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制。
3、應用場景(多客戶端,少量消息)
- 車聯網
- 工業互聯網
- 智能家居
- 視頻直播彈幕
- IM實時聊天(一對一聊天,群組聊天)
- 推送服務,比如推送實時新聞
- 金融交易數據訂閱推送
4、基本名詞
MQTT協議中的三種身份
- MQTT Broker:代理服務器
- Publish:發布者,發布消息
- Subscribe:訂閱者,訂閱消息
MQTT傳輸的消息
主題(Topic),負載(payload)和QoS
- Topic:消息的類型,訂閱者訂閱后就會收到該主題的消息內容(payload).
- Payload:消息的內容,指訂閱者具體要使用的內容。
- QoS:服務質量.
“至多一次”(QoS0):消息發布完全依賴底層 TCP/IP 網絡。會發生消息丟失或重復。這一級別可用于如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。即是推送之后就完事了,至于對方有沒有收到,收到是什么,數據有沒有丟失,都不管。
“至少一次”(QoS1):確保消息到達,但消息重復可能會發生。即是你收到推送后,你還得返回一個puback給對方,告訴對方收到了,不然對方會以為你沒收到,隔一段時間后重新給你推送,直到你給對方返回一個Puback為止。
“只有一次”(QoS2):確保消息到達一次。這一級別可用于如下情況,在計費系統中,消息重復或丟失會導致不正確的結果。
異常中斷的機制
使用 Last Will 和 Testament 特性通知有關各方客戶端。
- Last Will:即遺言機制,用于通知同一主題下的其他設備發送遺言的設備已經斷開了連接。
- Testament:遺言機制,功能類似于Last Will。
三、MQTT的簡單使用
1、搭建MQTT服務器EMQX
MQTT服務器有很多種,且部署方式也不一樣。
Linux (Ubuntu)
安裝命令:
curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash
sudo apt-get install emqx :安裝
sudo systemctl start emqx :啟動
下載鏈接:https://www.emqx.io/zh/downloads?os=Ubuntu
Docker:https://emqx.io/zh/downloads?os=Docker
Linux(centos):https://www.emqx.io/zh/downloads?os=CentOS
windows:https://www.emqx.io/zh/downloads?os=Windows
測試:確保emqx已正常運行后,可在Linux本地瀏覽器中輸入: http://127.0.0.1:18083。
注意:未登錄需修改密碼,兩次保持一致。
局域網其他電腦訪問須知Linux的IP地址。
2、MQTTX
創建連接
創建subscription
收發消息
#代表通配符,代表訂閱所有該類型的topic。
3、在OpenHarmony使用MQTT
安裝依賴
依賴地址:https://gitee.com/openharmony-tpc/ohos_mqtt。
有所不同的是我用的npm安裝的依賴npm install @ohos/mqtt。
創建Mqtt客戶端
創建mqtt客戶端并建立連接:
const clientOptions: MqttClientOptions = {
url: '192.168.xxx.xxx/:1883',
clientId: 'client_id_' + new Date().getTime(),
persistenceType: 1,
}
const connectOptions: MqttConnectOptions = {
userName: '',
password: '',
connectTimeout: 30,
}
this.mqClient = MqttAsync.createMqtt(this.clientOptions);
this.mqClient.connect(this.connectOptions, (data: MqttResponse) => {
console.log(TAG+" data: "+JSON.stringify(data));
if (data.code == 0) {
this.messageArrived();
this.subscribe('主設備號/#');
}
});
監聽
//接收消息,使用此接口后,當訂閱的主題有消息發布時,會自動接收到消息。
public messageArrived(): void {
this.mqClient.messageArrived((err, data) => {
console.log(TAG+"messageArrived!!!!!!!!!!!");
console.log(TAG+"messageArrived data:"+JSON.stringify(data));
});
}
訂閱消息
// 訂閱消息
public subscribe(topic: string, qos: QoS = 1): void {
const subscribeOption: MqttSubscribeOptions = { topic, qos };
this.mqClient.subscribe(subscribeOption, (err, data)=>{
this.handleMessage(data)
});
}
發布消息
// 發布消息
public publish<T>(topic: string, payload: string | Record<string, any>, qos: QoS = 0): void {
if (typeof payload !== 'string') {
payload = JSON.stringify(payload)
}
const payloadLen = payload.length;
const publishOption: MqttPublishOptions = { topic, payload, qos, payloadLen };
console.log(TAG, 'publishOption data: ' + JSON.stringify(publishOption));
this.mqClient.publish(publishOption, (err, data)=>{
console.log(TAG+"publish!!!!!!!!!!!");
console.log(TAG+"publish data:"+JSON.stringify(data));
}));
}
根據訂閱的消息的主題的不同進行不同的處理
handleMessage(data:any){
console.log(TAG+"subscribe!!!!!!!!!!!");
console.log(TAG+"subscribe data:"+JSON.stringify(data));
//根據data的不同進行不同的處理
}
四、擴展
mqtt與MQ中間件的關系。
消息中間件是基于隊列與消息傳遞技術,在網絡環境中為應用系統提供同步或異步、可靠的消息傳輸的支撐性軟件系統。
MQTT 與消息隊列有一定的區別,隊列是一種先進先出的數據結構,消息隊列常用于應用服務層面,實現參考如 RabbitMQ Kafka RocketMQ。
MQTT 是傳輸協議,絕大部分 MQTT Broker 不保證消息順序(Queue),常用語物聯網、消息傳輸等。