日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

這節介紹RocketMQ底層通信的原理

在之前的內容中有介紹過RocketMQ底層用了Netty來進行通信,下圖為RocketMQ通信的大致過程,主要分為Server端和Client端。

Netty-RocketMQ底層通信的利器

 

客戶端通過invokeSyncImpl、invokeAsyncImpl、invokeOnewayImpl這幾個方法同服務端交互。

1. NettyRemotingServer

Server啟動主要是初始化ServerBootstrap,主要配置如下:

Netty-RocketMQ底層通信的利器

 

  1. 設置tcp的參數,包括SO_BACKLOG、SO_REUSEADDR、SO_KEEPALIVE、TCP_NODELAY等。
  2. 設置pipeline處理鏈,包括編碼、解碼、空閑處理、連接管理、請求分發。

啟動完ServerBootstrap后會啟動一個定時器,每3秒清除超時的請求。

這里介紹下面幾個處理器:

  1. NettyEncoder
  2. NettyDecoder
  3. NettyConnectManageHandler
  4. NettyServerHandler

1.1. NettyEncoder

NettyEncoder繼承自LengthFieldBasedFrameDecoder,主要有用于解碼入站數據流,并將數據流解碼為RemotingCommand對象。

LengthFieldBasedFrameDecoder(自定義長度解碼器)的構造器,涉及5個參數,都與長度域(數據包中的長度字段)相關,具體介紹如下:

  1. maxFrameLength:發送的數據包最大長度;
  2. lengthFieldOffset:長度域偏移量,指的是長度域位于整個數據包字節數組中的下標;
  3. lengthFieldLength:長度域的自己的字節數長度。
  4. lengthAdjustment:長度域的偏移量矯正。 如果長度域的值,除了包含有效數據域的長度外,還包含了其他域(如長度域自身)長度,那么,就需要進行矯正。矯正的值為:包長 - 長度域的值 – 長度域偏移 – 長度域長。
  5. initialBytesToStrip:丟棄的起始字節數。丟棄處于有效數據前面的字節數量。比如前面有4個節點的長度域,則它的值為4。

以NettyEncoder為例,器構造構造方法為

public NettyDecoder() {
        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}

即數據流中前4個字節的值表示有效數據域的長度,除開前4個字節外的內容都是有效數據域的內容,不存在偏移量。

接收到數據域的內容后,便會調用RemotingCommand.decode方法,將數據流轉為RemotingCommand對象。

RemotingCommand對象分為Header部分和Body部分。Header部分包括固定的一組字段,已經長度不定的擴展字段;Body部分為byte[],不進行具體的細分。

Netty-RocketMQ底層通信的利器

 

數據域的解析過程同上面的類似,數據域中前4個自己為Header域的長度,取到Header長度后便能計算出Body長度,從而進行讀取。RemotingCommand的內容如下:

Netty-RocketMQ底層通信的利器

 

根據serializerType的不同,Header的編碼會分為Json或者二進制的方式。

1.2. NettyDecoder

NettyEncoder的反過程,將RemotingCommand對象序列化為ByteBuffer對象。根據serializerType的不同,Header會編碼為JSON或者二進制。

1.3. NettyConnectManageHandler

NettyConnectManageHandler繼承自ChannelDuplexHandler,用于監聽pipeline中入站/出站的事件,主要進行日志記錄。

1.4. NettyServerHandler

NettyServerHandler繼承自SimpleChannelInboundHandler,重寫了channelRead0方法,在里面調用了父類NettyRemotingAbstract的processMessageReceived方法,如下:

Netty-RocketMQ底層通信的利器

 

該方法定義了請求和響應的處理過程。

1.processRequestCommand

處理請求過程,先根據RemotingCommand中的code值判斷當前請求是否能夠處理,如果不能處理則直接響應不支持。如果可以支持,則會找到對應的處理器,新起線程來處理當前請求。需要說明的是,NettyRemotingServer內部維護這一個processorTable,表示該server可以處理的command,對應的Processor以及對應的線程池。

protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

Processor的定義如下,對于具體的command,會由對應的Processor來處理

public interface NettyRequestProcessor {
    RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
        throws Exception;

    boolean rejectRequest();
}

RocketMQ提供的Processor如下,其中一個Processor可能會處理一個或者多個code.

Netty-RocketMQ底層通信的利器

 

2.processResponseCommand

客戶端發起一次調用時,會根據請求id,構造一個ResponseFuture,并將其緩存在responseTable字段中,用來表示目前正在進行中的請求。

protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer, ResponseFuture>(256);

當有響應的時候,會根據請求id,獲取對應的ResponseFuture,再進行后置處理,包括執行回調、釋放資源等。

2. NettyRemotingClient

Client啟動主要是初始化Bootstrap,主要配置如下:

Netty-RocketMQ底層通信的利器

 

  1. 設置tcp的參數,包括TCP_NODELAY、SO_KEEPALIVE、CONNECT_TIMEOUT_MILLIS等。
  2. 設置pipeline處理鏈,包括編碼、解碼、空閑處理、連接管理、請求分發。

啟動完ServerBootstrap后會啟動一個定時器,每3秒清除超時的請求。

Client端處理鏈上的幾個處理器,除了NettyClientHandler外都同Server端的一樣。而NettyClientHandler也繼承自SimpleChannelInboundHandler,并重寫了channelRead0方法,在里面調用了父類NettyRemotingAbstract的processMessageReceived方法,過程跟Server端類似。

3. 調用流程

上面介紹了Server端和Client端的啟動過程,以及消息的編解碼,這里介紹消息的具體請求過程。主要是開頭提到的invokeSyncImpl、invokeAsyncImpl和invokeOnewayImpl這幾個方法。

3.1. invokeSyncImpl 同步調用

內部是通過countdownlatch等待來模擬的同步調用,如下圖:

Netty-RocketMQ底層通信的利器

 

  1. 客戶端調用invokeSyncImpl后,client會構造ResponseFeature對象,并根據請求id將其緩存起來,然后調用Netty發送請求后在ChannelFutureListener中等待回調。
  2. 這時候客戶端會通過countdownlatch等待一定的時間,如果客戶端請求成功,則在ChannelFutureListener中直接返回,等待超時時間到達;如果請求失敗,則直接通知countdownlatch,不再等待,直接返回
  3. 請求到達服務端,經過NettyDecoder、NettyServerHandler后,會調用processRequestCommand方法,最終在對應類型的線程池中提交任務,任務執行完后通過執行糊掉,返回結果
  4. 客戶端接收到響應后,通過NettyClientHandler,會加油processResponseCommand方法處理,這時會根據請求id獲取之前的ResponseReature對象,執行回調,最后清除緩存。

3.2. invokeAsyncImpl 異步調用

相比同步調用,少了等待超時時間,但是增加了semaphore信號量控制最多有多少個連接同時執行。請求發起后,將結果對象緩存起來,結果將通過InvokeCallback進行回調,如果有設置回調函數,結果返回,在回調線程發起后就會將信號量回收,如果沒有設置回調函數,結果返回后就會將信號量回收。其余過程大致同同步調用類似。

Netty-RocketMQ底層通信的利器

 

3.3. invokeOnewayImpl 單步調用

單向請求,無結果,請求成功后不等待結果,直接釋放信號量,服務端也不會返回結果。

Netty-RocketMQ底層通信的利器

 

3.4. MQClientAPIImpl

MQClientAPIImpl在之前介紹過,主要為Producer和Consumer提供遠程通信調用的功能,內部主要是對NettyRemotingClient的封裝,以對外提供服務,如:

  1. createSubscriptionGroup,請求broker創建group
  2. createTopic,請求broker創建創建topic
  3. sendMessage,發送消息,單步、異步、同步

等多種服務的封裝。同時MQClientAPIImpl也能夠接收服務端的主動請求,從而進行響應,對外提供的具體功能如下,通過調用registerProcessor來添加:

Netty-RocketMQ底層通信的利器

 

以NOTIFY_CONSUMER_IDS_CHANGED為例,當Broker發現Group中的Consumer實例發生改變的時候,會遍歷客戶的連接Channel,然后逐一通知到客戶端。這時候客戶端的角色轉變為”服務端“,服務端轉變為"客戶端",兩端都會觸發processResponseCommand方法。

分享到:
標簽:Netty RocketMQ
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定