簡介: IoT企業物聯網平臺開發實戰
企業從聯網平臺開發實戰
通過閱讀本文你將學會一下技能:
- 設備通過MQTT協議與您在阿里云上購買的IoT企業實例建立雙向連接,上報設備采集的數據,監聽云端下達的指令;
- 通過規則引擎配置把上報的數據實時存儲到指定數據庫,無需編寫代碼
- 通過規則引擎配置把上報的數據實時流轉到業務服務器,需要使用AMQP協議SDK
- 業務服務器調用IoT平臺的API,下達控制指令到設備端
創建企業實例
首先,我們登錄物聯網平臺控制臺(https://iot.console.aliyun.com), 點擊購買實例來創建一個企業實例。
然后,在購買頁面,根據實際業務需求,選擇地域、實例類型、設備數量、消息上下行TPS、規則引擎TPS等參數,點擊立即購買,付費成功后,即可看到企業實例創建中。
稍等幾分鐘后,企業實例創建完成。進入企業實例,我們可以看到當前規格參數,設備接入點信息,AMQP訂閱接入點信息,云端API調用接入點信息。如下圖:
創建產品和注冊設備
在企業實例的設備管理頁面,我們需要先創建一個產品家庭溫控器,數據通信以JSON格式,認證方式為設備秘鑰。
在產品的功能定義頁面,我們添加溫度和濕度兩個屬性,具體細節如下圖:
最后,我們在設備管理頁面,基于家庭溫控器產品,注冊一個物理設備,并獲取設備身份認證的三元組。如下圖:
設備接入和上報數據
獲取設備身份三元組后,即可通過MQTT協議接入到我們開通的企業實例。設備端應用程序邏輯如下圖:
完整的Nodejs示例代碼如下:
const mqtt = require('aliyun-iot-mqtt');
// 1. 設備身份信息
var options = {
productKey: "產品productKey",
deviceName: "設備deviceName",
deviceSecret: "設備deviceSecret",
host: "實例化MQTT接入點"
};
// 2. 建立MQTT連接
const client = mqtt.getAliyunIotMqttClient(options);
client.subscribe(`/${options.productKey}/${options.deviceName}/user/get`)
client.on('message', function(topic, message) {
console.log("topic " + topic)
console.log("message " + message)
})
setInterval(function() {
// 3.上報溫濕度數據
client.publish(`/sys/${options.productKey}/${options.deviceName}/thing/event/property/post`, getPostData(), { qos: 0 });
}, 5 * 1000);
function getPostData() {
const payloadJson = {
id: Date.now(),
version: "1.0",
params: {
temperature: Math.floor((Math.random() * 20) + 10),
humidity: Math.floor((Math.random() * 20) + 10)
},
method: "thing.event.property.post"
}
console.log("payloadJson " + JSON.stringify(payloadJson))
return JSON.stringify(payloadJson);
}
啟動模擬腳本后,我們看到設備狀態為在線,從模型數據中可以看到最新上報的溫度和濕度值。
在監控運維的日志服務里,也可以看到設備上報數據的日志。如下圖:
數據存儲到數據庫
首先,我們創建一個表格存儲實例 IoTDataStore,建立一張數據表iot_data,以deviceName和timestamp為主鍵。如下圖:
在IoT企業實例,云產品流轉中創建規則引擎,編寫數據處理SQL,配置流轉目的地為上面創建的數據庫表。
數據處理SQL編輯界面:
數據流轉到表格存儲編輯界面:
當設備有數據上報后,我們就可以在表格存儲的iot_data表中實時看到存儲的數據了。如下圖:
在企業實例的日志服務中,我們可以查看到完整的流轉日志,協助我們排查數據鏈路異常。如下圖:
業務服務器實時接收數據
IoT場景中有些數據需要業務系統實時處理,這時我們可以通過服務端訂閱AMQP方式,實時接收設備上報的數據。
首先,我們要在企業實例的服務端訂閱中,創建一個新的消費組,用來接收特定類型的消息。如下圖:
然后,我們在云產品流轉中創建規則引擎,編寫數據處理SQL,配置流轉目的地為上面創建的服務端訂閱消費組。
最后,我們在業務服務器編寫程序,使用阿里云賬號的AccessKey與IoT企業實例建立AMQP連接,參考代碼如下:
public static void main(String[] args) throws Exception {
//參數說明
String accessKey = "子賬號accessKey";
String accessSecret = "子賬號accessSecret";
String consumerGroupId = "消費組Id";
String iotInstanceId = "企業實例Id";
long timeStamp = System.currentTimeMillis();
//簽名方法:支持hmacmd5,hmacsha1和hmacsha256
String signMethod = "hmacsha1";
String clientId = "ecs_"+System.currentTimeMillis();
String userName = clientId + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + accessKey
+ ",iotInstanceId=" + iotInstanceId
+ ",consumerGroupId=" + consumerGroupId
+ "|";
//password組裝
String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;
String password = doSign(signContent,accessSecret, signMethod);
//按照qpid-jms的規范,組裝連接URL。
String connectionUrl = "failover:(amqps://"+iotInstanceId+".amqp.iothub.aliyuncs.com:5671?amqp.idleTimeout=80000)"
+ "?failover.reconnectDelay=30";
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF",connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.Apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");
// Create Connection
Connection connection = cf.createConnection(userName, password);
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// Create Session
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手動調用message.acknowledge()
// Session.AUTO_ACKNOWLEDGE: SDK自動ACK(推薦)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// Create Receiver Link
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
}
在AMQP的回調中處理收到的業務數據,參考代碼如下:
private static MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(Message message) {
try {
//如果要對收到的消息做耗時的處理,請異步處理,確保這里不要有耗時邏輯。
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
String tag = message.getStringProperty("tag");
logger.info("receive message"
+ ",n topic = " + topic
+ ",n messageId = " + messageId
+ ",n tag = " + tag
+ ",n content = " + content
+"n");
System.out.println();
} catch (Exception e) {
e.printStackTrace();
}
}
};
啟動業務服務器后,我們看到不斷有設備數據流轉過來,如下圖:
在企業實例的控制臺,服務端訂閱中,我們也可以看到消費組的運行情況,包括消費速率,消息堆積量,消費客戶端列表,如下圖:
在企業實例的控制臺,日志服務中,我們可以看到完整的消息流轉日志,如下圖:
下達云端控制指令
業務系統通過調用IoT物聯網平臺提供的HTTPS API 可以給指定設備下發控制指令,調用過程如下:
Pub API調用的參考代碼:
const co = require('co');
const RPCClient = require('@alicloud/pop-core').RPCClient;
const options = {
accessKey: "子賬號accessKey",
accessKeySecret: "子賬號accessKeySecret"
};
//1.初始化 API Client
const client = new RPCClient({
accessKeyId: options.accessKey,
secretAccessKey: options.accessKeySecret,
endpoint: 'https://iot.cn-beijing.aliyuncs.com',
apiVersion: '2018-01-20',
});
// 指令內容
const payload = {
washingMode: 2,
washingTime: 30
};
//2.構建Pub API 請求
const params = {
TopicFullName: "下行指令的Topic",
MessageContent: new Buffer(JSON.stringify(payload)).toString("base64"),
ProductKey: "產品ProductKey",
IotInstanceId: "實例化Id",
Qos: 1
};
co(function*() {
//3.發起Pub API調用
try {
const response = yield client.request('Pub', params);
console.log("Pub SUCCESS =====>", JSON.stringify(response));
} catch (err) {
console.log("Pub ERROR =====>", JSON.stringify(err));
}
});
運行日志:
在企業實例的控制臺,日志服務中,我們也可以追蹤到完整的下行鏈路日志,如下圖: