一、目錄介紹
- 功能梳理
- 具體實(shí)現(xiàn)
二、需求梳理
通過前面兩章內(nèi)容的學(xué)習(xí),我們基本學(xué)會了如何使用.NETty 建立一個長連接,接下來我們就在這個基礎(chǔ)上,實(shí)現(xiàn)一個單機(jī)版的 im 系統(tǒng)。
主要功能,我梳理了一下:
- 登錄
- 維持連接、心跳檢測
- 聊天消息
- 消息ack
使用到的相關(guān)組件:
- SpringBoot-job
- GuavaCache
三、具體實(shí)現(xiàn)
本期的內(nèi)容是基于,原理篇一的 dome 代碼基礎(chǔ)上進(jìn)行的,沒有看過的原理一的小伙伴,建議先回顧一下原理篇一。
原理篇一的代碼結(jié)構(gòu):
- Server(主程序)
- ServerHandler(業(yè)務(wù)處理程序)
實(shí)戰(zhàn)篇一的代碼結(jié)構(gòu):
代碼的層級結(jié)構(gòu)如上所示,接下來,我們將會一個個模塊對邏輯進(jìn)行講解。
1、登錄
1)實(shí)現(xiàn)邏輯
不管是長連接還是短連接,鑒權(quán)這個動作都是要有的,我相信這個功能模塊,大家是很好理解的。我這里就不在過多的贅述了,具體實(shí)現(xiàn)步驟如下所示:
1、前后端建立 ws 連接
2、前端發(fā)送登錄類型的報(bào)文,如下所示:
{
"token": "2",
"type": "10"
}
token:這里的 token,就是用戶登錄標(biāo)識,大家可以根據(jù)自己所依賴的業(yè)務(wù)系統(tǒng),進(jìn)行修改。
type:這里表示消息報(bào)文的類型,本文所有類型定義如下所示:
- USER_LOGIN(10, "用戶上線")
- USER_LOGIN_RESP(11, "用戶上線響應(yīng)")
- HEARTBEAT_TIMEOUT(30, "心跳超時(shí)")
- PING(40, "心跳")
- PONG(41, "心跳響應(yīng)")
- CHAT(80, "聊天"),
- CHAT_RESP(81, "聊天響應(yīng)")
- ACK(90, "確認(rèn)")
- ACK_RESP(91, "確認(rèn)響應(yīng)")
- UNKNOWN(0, "未知類型")
示例代碼如下圖所示,WsMsgDispatcher.dispatch
消息類型
3、后端對 token 進(jìn)行校驗(yàn),校驗(yàn)成功就記錄用戶登錄信息。
示例代碼如下圖所示,UserLoginProcessor.login
登錄業(yè)務(wù)邏輯
2)具體效果
主要的業(yè)務(wù)代碼我們已經(jīng)講解完畢了,接下來我們來看看效果:
用戶登錄
從上圖,我們可以看到,我們登錄的兩個用戶都成功了,并且返回了對應(yīng)的用戶信息。
2、維持連接、心跳檢測
這個模塊的功能,其實(shí)我們在原理篇二的時(shí)候已經(jīng)講過了具體的實(shí)現(xiàn)方案了,這里也不再過多的贅述了,我們直接來看具體實(shí)現(xiàn)方法吧。
1)維持連接
1、前端每10秒發(fā)送一次心跳消息,報(bào)文如下所示:
{
"type": "40",
"fromId": "2"
}
注:前端發(fā)送的每個消息,理論上都是需要帶上用戶表示的,后端都是需要進(jìn)行鑒權(quán)操作的。我們這里為了方便講解(偷懶,bushi)將這部分邏輯進(jìn)行了簡化,大家在具體實(shí)現(xiàn)的時(shí)候,記得一定要加上 鑒權(quán)邏輯。
2、后端檢測,用戶是否還在線,如果在線,則刷新用戶的最新在線時(shí)間,并回復(fù) PONG 消息。
示例代碼如下圖所示,HeartBeatProcessor.process
維持連接1
維持連接2
2)心跳檢測
這里主要是基于 IdleStateEvent 事件實(shí)現(xiàn)的。
TextWebSocketFrameHandler 繼承 SimpleChannelInboundHandler 類,并實(shí)現(xiàn) userEventTriggered 方法,具體代碼如下所示:
心跳檢測
這里詳細(xì)說一下,三種事件的區(qū)別:
- readerIdleTimeSeconds:讀超時(shí)。即當(dāng)在指定的時(shí)間間隔內(nèi)沒有從 Channel 讀取到數(shù)據(jù)時(shí),會觸發(fā)一個 READER_IDLE 的 IdleStateEvent 事件。
- writerIdleTimeSeconds: 寫超時(shí)。即當(dāng)在指定的時(shí)間間隔內(nèi)沒有數(shù)據(jù)寫入到 Channel 時(shí),會觸發(fā)一個 WRITER_IDLE 的 IdleStateEvent 事件。
- allIdleTimeSeconds: 讀/寫超時(shí)。即當(dāng)在指定的時(shí)間間隔內(nèi)沒有讀且沒有寫操作時(shí),會觸發(fā)一個 ALL_IDLE 的 IdleStateEvent 事件。
所以,我們這里檢測 ALL_IDLE 事件即可。
3)具體效果
維持連接效果如下所示:
維持連接效果
心跳檢測效果如下所示:
心跳超時(shí)效果1
心跳超時(shí)效果2
3、聊天消息
聊天消息模塊主要分為兩部分:
- 消息接收:客戶端推送消息到服務(wù)端
- 消息推送:服務(wù)端將消息推送到指定的客戶端
這邊主要的難點(diǎn)在于,服務(wù)端將消息推送到指定的客戶端,具體場景有2種情況:
- 消息的發(fā)送者和消息的接受者,在同一臺服務(wù)器上建立的 ws 連接,這種情況,就很好處理,直接在服務(wù)器上找到建立的 ws 連接,然后將消息推送給對應(yīng)的客戶端。
- 消息的發(fā)送者和消息的接受者,在不同的服務(wù)器上建立的 ws 連接,這種情況就比較復(fù)雜,實(shí)現(xiàn)方案也很多,比較簡單的實(shí)現(xiàn)方式就是,發(fā)送一條廣播消息,讓對應(yīng)的服務(wù)器,將消息推送到指定的客戶端。
本文由于是 單機(jī)版 的 im,所以只會有第一種情況發(fā)生,第二種情況就留給大家自由發(fā)揮了。
1)消息接收
具體步驟如下所示:
1、客戶端發(fā)送類型為80的報(bào)文,如下所示:
{
"type": "80",
"fromId": "1",
"toId": "2",
"content": {
"contentType": 1,
"body": "測試消息"
}
}
2、服務(wù)端(ChatProcessor)對消息進(jìn)行處理,具體代碼如下所示:
消息接收
2)消息推送
具體步驟如下所示:
1、獲取消息接受者所連接的服務(wù)器 ip 地址 2、判斷當(dāng)前服務(wù)器 ip 地址是否和上面的 ip 地址相同,如果相同則推送消息,否則轉(zhuǎn)發(fā)給目標(biāo)服務(wù)器
具體代碼如下所示:
消息推送
3)具體效果
1、我們先登錄兩個用戶,分別是張三、李四,如下圖所示:
聊天登錄
2、張三發(fā)送消息給李四,如下圖所示:
張三發(fā)送消息給李四
3、李四發(fā)送消息給張三,如下圖所示:
李四發(fā)送消息給張三
4、消息 ack
因?yàn)榫W(wǎng)絡(luò)環(huán)境異常或者其他異常狀況的發(fā)送,可能會出現(xiàn)消息推送失敗的情況,這時(shí)候就需要 消息 ack 機(jī)制和重試,來保證我們的消息可以推送成功。
1)消息 ack 機(jī)制
具體步驟如下:
1、客戶端收到 80 類型的消息,解析并發(fā)送 ack 報(bào)文,如下所示:
{
"type": "90",
"msgId": "2bfea133-72a8-4315-82aa-80049fe4fb7b"
}
2、服務(wù)端收到 ack 消息,變更消息狀態(tài)(AckProcessor),具體代碼如下圖所示:
消息ack
2)消息重試
這里因?yàn)槭菃螜C(jī)版 im,所以直接采用 SpringBoot-Job 實(shí)現(xiàn),Job 代碼如下所示:
消息重試
以上文章來源于JAVA知音 ,作者啊杰