這節介紹RocketMQ底層通信的原理
在之前的內容中有介紹過RocketMQ底層用了Netty來進行通信,下圖為RocketMQ通信的大致過程,主要分為Server端和Client端。
客戶端通過invokeSyncImpl、invokeAsyncImpl、invokeOnewayImpl這幾個方法同服務端交互。
1. NettyRemotingServer
Server啟動主要是初始化ServerBootstrap,主要配置如下:
- 設置tcp的參數,包括SO_BACKLOG、SO_REUSEADDR、SO_KEEPALIVE、TCP_NODELAY等。
- 設置pipeline處理鏈,包括編碼、解碼、空閑處理、連接管理、請求分發。
啟動完ServerBootstrap后會啟動一個定時器,每3秒清除超時的請求。
這里介紹下面幾個處理器:
- NettyEncoder
- NettyDecoder
- NettyConnectManageHandler
- NettyServerHandler
1.1. NettyEncoder
NettyEncoder繼承自LengthFieldBasedFrameDecoder,主要有用于解碼入站數據流,并將數據流解碼為RemotingCommand對象。
LengthFieldBasedFrameDecoder(自定義長度解碼器)的構造器,涉及5個參數,都與長度域(數據包中的長度字段)相關,具體介紹如下:
- maxFrameLength:發送的數據包最大長度;
- lengthFieldOffset:長度域偏移量,指的是長度域位于整個數據包字節數組中的下標;
- lengthFieldLength:長度域的自己的字節數長度。
- lengthAdjustment:長度域的偏移量矯正。 如果長度域的值,除了包含有效數據域的長度外,還包含了其他域(如長度域自身)長度,那么,就需要進行矯正。矯正的值為:包長 - 長度域的值 – 長度域偏移 – 長度域長。
- initialBytesToStrip:丟棄的起始字節數。丟棄處于有效數據前面的字節數量。比如前面有4個節點的長度域,則它的值為4。
以NettyEncoder為例,器構造構造方法為
public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
即數據流中前4個字節的值表示有效數據域的長度,除開前4個字節外的內容都是有效數據域的內容,不存在偏移量。
接收到數據域的內容后,便會調用RemotingCommand.decode方法,將數據流轉為RemotingCommand對象。
RemotingCommand對象分為Header部分和Body部分。Header部分包括固定的一組字段,已經長度不定的擴展字段;Body部分為byte[],不進行具體的細分。
數據域的解析過程同上面的類似,數據域中前4個自己為Header域的長度,取到Header長度后便能計算出Body長度,從而進行讀取。RemotingCommand的內容如下:
根據serializerType的不同,Header的編碼會分為Json或者二進制的方式。
1.2. NettyDecoder
NettyEncoder的反過程,將RemotingCommand對象序列化為ByteBuffer對象。根據serializerType的不同,Header會編碼為JSON或者二進制。
1.3. NettyConnectManageHandler
NettyConnectManageHandler繼承自ChannelDuplexHandler,用于監聽pipeline中入站/出站的事件,主要進行日志記錄。
1.4. NettyServerHandler
NettyServerHandler繼承自SimpleChannelInboundHandler,重寫了channelRead0方法,在里面調用了父類NettyRemotingAbstract的processMessageReceived方法,如下:
該方法定義了請求和響應的處理過程。
1.processRequestCommand
處理請求過程,先根據RemotingCommand中的code值判斷當前請求是否能夠處理,如果不能處理則直接響應不支持。如果可以支持,則會找到對應的處理器,新起線程來處理當前請求。需要說明的是,NettyRemotingServer內部維護這一個processorTable,表示該server可以處理的command,對應的Processor以及對應的線程池。
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
Processor的定義如下,對于具體的command,會由對應的Processor來處理
public interface NettyRequestProcessor {
RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws Exception;
boolean rejectRequest();
}
RocketMQ提供的Processor如下,其中一個Processor可能會處理一個或者多個code.
2.processResponseCommand
客戶端發起一次調用時,會根據請求id,構造一個ResponseFuture,并將其緩存在responseTable字段中,用來表示目前正在進行中的請求。
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
當有響應的時候,會根據請求id,獲取對應的ResponseFuture,再進行后置處理,包括執行回調、釋放資源等。
2. NettyRemotingClient
Client啟動主要是初始化Bootstrap,主要配置如下:
- 設置tcp的參數,包括TCP_NODELAY、SO_KEEPALIVE、CONNECT_TIMEOUT_MILLIS等。
- 設置pipeline處理鏈,包括編碼、解碼、空閑處理、連接管理、請求分發。
啟動完ServerBootstrap后會啟動一個定時器,每3秒清除超時的請求。
Client端處理鏈上的幾個處理器,除了NettyClientHandler外都同Server端的一樣。而NettyClientHandler也繼承自SimpleChannelInboundHandler,并重寫了channelRead0方法,在里面調用了父類NettyRemotingAbstract的processMessageReceived方法,過程跟Server端類似。
3. 調用流程
上面介紹了Server端和Client端的啟動過程,以及消息的編解碼,這里介紹消息的具體請求過程。主要是開頭提到的invokeSyncImpl、invokeAsyncImpl和invokeOnewayImpl這幾個方法。
3.1. invokeSyncImpl 同步調用
內部是通過countdownlatch等待來模擬的同步調用,如下圖:
- 客戶端調用invokeSyncImpl后,client會構造ResponseFeature對象,并根據請求id將其緩存起來,然后調用Netty發送請求后在ChannelFutureListener中等待回調。
- 這時候客戶端會通過countdownlatch等待一定的時間,如果客戶端請求成功,則在ChannelFutureListener中直接返回,等待超時時間到達;如果請求失敗,則直接通知countdownlatch,不再等待,直接返回
- 請求到達服務端,經過NettyDecoder、NettyServerHandler后,會調用processRequestCommand方法,最終在對應類型的線程池中提交任務,任務執行完后通過執行糊掉,返回結果
- 客戶端接收到響應后,通過NettyClientHandler,會加油processResponseCommand方法處理,這時會根據請求id獲取之前的ResponseReature對象,執行回調,最后清除緩存。
3.2. invokeAsyncImpl 異步調用
相比同步調用,少了等待超時時間,但是增加了semaphore信號量控制最多有多少個連接同時執行。請求發起后,將結果對象緩存起來,結果將通過InvokeCallback進行回調,如果有設置回調函數,結果返回,在回調線程發起后就會將信號量回收,如果沒有設置回調函數,結果返回后就會將信號量回收。其余過程大致同同步調用類似。
3.3. invokeOnewayImpl 單步調用
單向請求,無結果,請求成功后不等待結果,直接釋放信號量,服務端也不會返回結果。
3.4. MQClientAPIImpl
MQClientAPIImpl在之前介紹過,主要為Producer和Consumer提供遠程通信調用的功能,內部主要是對NettyRemotingClient的封裝,以對外提供服務,如:
- createSubscriptionGroup,請求broker創建group
- createTopic,請求broker創建創建topic
- sendMessage,發送消息,單步、異步、同步
等多種服務的封裝。同時MQClientAPIImpl也能夠接收服務端的主動請求,從而進行響應,對外提供的具體功能如下,通過調用registerProcessor來添加:
以NOTIFY_CONSUMER_IDS_CHANGED為例,當Broker發現Group中的Consumer實例發生改變的時候,會遍歷客戶的連接Channel,然后逐一通知到客戶端。這時候客戶端的角色轉變為”服務端“,服務端轉變為"客戶端",兩端都會觸發processResponseCommand方法。