日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線(xiàn)咨詢(xún)客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

作者:vivo 互聯(lián)網(wǎng)服務(wù)器團(tuán)隊(duì)- Jin KAI

本文從JAVA NIO網(wǎng)絡(luò)編程的基礎(chǔ)知識(shí)講到了Tars框架使用NIO進(jìn)行網(wǎng)絡(luò)編程的源碼分析。

一、Tars框架基本介紹

Tars是騰訊開(kāi)源的支持多語(yǔ)言的高性能RPC框架,起源于騰訊內(nèi)部2008年至今一直使用的統(tǒng)一應(yīng)用框架TAF(Total Application Framework),目前支持C++、Java、php、Nodejs、Go語(yǔ)言。

該框架為用戶(hù)提供了涉及到開(kāi)發(fā)、運(yùn)維、以及測(cè)試的一整套解決方案,幫助一個(gè)產(chǎn)品或者服務(wù)快速開(kāi)發(fā)、部署、測(cè)試、上線(xiàn)。它集可擴(kuò)展協(xié)議編解碼、高性能RPC通信框架、名字路由與發(fā)現(xiàn)、發(fā)布監(jiān)控、日志統(tǒng)計(jì)、配置管理等于一體,通過(guò)它可以快速用微服務(wù)的方式構(gòu)建自己的穩(wěn)定可靠的分布式應(yīng)用,并實(shí)現(xiàn)完整有效的服務(wù)治理。

官方倉(cāng)庫(kù)地址:

https://Github.com/TarsCloud/Tars

vivo推送平臺(tái)也深度使用了該框架,部署服務(wù)節(jié)點(diǎn)超過(guò)一千個(gè),經(jīng)過(guò)線(xiàn)上每日一百多億消息推送量的考驗(yàn)。

此前已在vivo互聯(lián)網(wǎng)技術(shù)公眾號(hào)發(fā)布過(guò)《 Tars Java 客戶(hù)端源碼分析 》此篇文章為續(xù)集。

Tars-java 最新穩(wěn)定版1.7.2以及之前的版本都使用Java NIO進(jìn)行網(wǎng)絡(luò)編程;本文將分別詳細(xì)介紹java NIO的原理和Tars 使用NIO進(jìn)行網(wǎng)絡(luò)編程的細(xì)節(jié)。

二、Java NIO原理介紹

從1.4版本開(kāi)始,Java提供了一種新的IO處理方式:NIO (New IO 或 Non-blocking IO) 是一個(gè)可以替代標(biāo)準(zhǔn)Java IO 的API,它是面向緩沖區(qū)而不是字節(jié)流,它是非阻塞的,支持IO多路復(fù)用。

2.1 Channels (通道) and Buffers (緩沖區(qū))

標(biāo)準(zhǔn)的IO基于字節(jié)流進(jìn)行操作的,而NIO是基于通道(Channel)和緩沖區(qū)(Buffer)進(jìn)行操作。數(shù)據(jù)總是從通道讀取到緩沖區(qū)中,或者從緩沖區(qū)寫(xiě)入到通道中,下圖是一個(gè)完整流程。

Channel類(lèi)型:

  1. 支持文件讀寫(xiě)數(shù)據(jù)的FileChannel
  2. 能通過(guò)UDP讀寫(xiě)網(wǎng)絡(luò)中的數(shù)據(jù)的DatagramChannel
  3. 能通過(guò)TCP讀寫(xiě)網(wǎng)絡(luò)數(shù)據(jù)的SocketChannel
  4. 可以監(jiān)聽(tīng)新進(jìn)來(lái)的TCP連接,對(duì)每一個(gè)新進(jìn)來(lái)的連接都會(huì)創(chuàng)建一個(gè)SocketChannel的ServerSocketChannel 。

SocketChannel:

  • 打開(kāi) SocketChannel: SocketChannel socketChannel = SocketChannel.open;
  • 關(guān)閉 SocketChannel: socketChannel.close;
  • 從Channel中讀取的數(shù)據(jù)放到Buffer: int bytesRead = inChannel.read(buf);
  • 將Buffer中的數(shù)據(jù)寫(xiě)到Channel: int bytesWritten = inChannel.write(buf);

ServerSocketChannel:

通過(guò) ServerSocketChannel.accept 方法監(jiān)聽(tīng)新進(jìn)來(lái)的連接,當(dāng)accept方法返回的時(shí)候,它返回一個(gè)包含新進(jìn)來(lái)的連接的SocketChannel,因此accept方法會(huì)一直阻塞到有新連接到達(dá)。

通常不會(huì)僅僅只監(jiān)聽(tīng)一個(gè)連接,在while循環(huán)中調(diào)用 accept方法. 如下面的例子:

代碼1:

while( true){ SocketChannel socketChannel = serverSocketChannel.accept; //do something with socketChannel... }

ServerSocketChannel可以設(shè)置成非阻塞模式。在非阻塞模式下,accept 方法會(huì)立刻返回,如果還沒(méi)有新進(jìn)來(lái)的連接,返回的將是null。因此,需要檢查返回的SocketChannel是否是null。

代碼2:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open; serverSocketChannel.socket.bind( new.NETSocketAddress( 8888)); serverSocketChannel.configureBlocking( false); while( true){ SocketChannel socketChannel = serverSocketChannel.accept; if(socketChannel != null){ //do something with socketChannel... } }

Buffer類(lèi)型:

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

Buffer的分配:

ByteBuffer buf = ByteBuffer.allocate(2048);

Buffer的讀寫(xiě):

一般是以下四個(gè)步驟:

  1. 寫(xiě)入數(shù)據(jù)到Buffer,最大寫(xiě)入量是capacity,寫(xiě)模式下limit值即為capacity值,position即為寫(xiě)到的位置。
  2. 調(diào)用flip方法將Buffer從寫(xiě)模式切換到讀模式,此時(shí)position移動(dòng)到開(kāi)始位置0,limit移動(dòng)到position的位置。
  3. 從Buffer中讀取數(shù)據(jù),在讀模式下可以讀取之前寫(xiě)入到buffer的所有數(shù)據(jù),即為limit位置。
  4. 調(diào)用clear方法或者compact方法。clear方法將position設(shè)為0,limit被設(shè)置成capacity的值。compact方法將所有未讀的數(shù)據(jù)拷貝到Buffer起始處,然后將position設(shè)到最后一個(gè)未讀元素后面。

mark 與 reset方法

通過(guò)調(diào)用Buffer.mark方法,可以標(biāo)記Buffer中的一個(gè)特定position,之后可以通過(guò)調(diào)用Buffer.reset方法恢復(fù)到這個(gè)position。

duplicate

此方法返回承載先前字節(jié)緩沖區(qū)內(nèi)容的新字節(jié)緩沖區(qū)。

remaining

limit 減去 position的值

2.2 Selector(選擇器)

Java NIO引入了選擇器的概念,選擇器用于監(jiān)聽(tīng)多個(gè)通道的事件。單個(gè)的線(xiàn)程可以監(jiān)聽(tīng)多個(gè)數(shù)據(jù)通道。要使用Selector,得向Selector注冊(cè)Channel,然后調(diào)用它的select方法。這個(gè)方法會(huì)一直阻塞到某個(gè)注冊(cè)的通道有事件就緒。一旦這個(gè)方法返回,線(xiàn)程就可以處理這些事件。

線(xiàn)程使用一個(gè)selector處理多個(gè)channel

代碼3:

channel.configureBlocking( false); SelectionKey key = channel. register(selector,Selectionkey.OP_READ);

注意register方法的第二個(gè)參數(shù),這是一個(gè)監(jiān)聽(tīng)的集合,即在通過(guò)Selector監(jiān)聽(tīng)Channel時(shí)關(guān)注什么事件集合。

SelectionKey包含:

1) interest集合:selectionKey.interestOps 可以監(jiān)聽(tīng)四種不同類(lèi)型的事件:OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ

2) ready集合:selectionKey.readyOps; ready 集合是通道已經(jīng)準(zhǔn)備就緒的操作的集合,提供4個(gè)方便的方法:

  • selectionKey.isAcceptable;
  • selectionKey.isConnectable;
  • selectionKey.isReadable;
  • selectionKey.isWritable;

3) Channel:selectionKey.channel;

4) Selector:selectionKey.selector;

5) 可選的附加對(duì)象:

提示:

OP_ACCEPT和OP_CONNECT的區(qū)別:簡(jiǎn)單來(lái)說(shuō),客戶(hù)端建立連接是connect,服務(wù)器準(zhǔn)備接收連接是accept。一個(gè)典型的客戶(hù)端服務(wù)器網(wǎng)絡(luò)交互流程如下圖

selectedKeys

一旦調(diào)用了select方法,并且返回值表明有一個(gè)或更多個(gè)通道就緒了,然后可以通過(guò)調(diào)用selector的selectedKeys方法,訪(fǎng)問(wèn)已選擇鍵集(selected key set)中的就緒通道。

wakeUp

某個(gè)線(xiàn)程調(diào)用select方法后阻塞了,即使沒(méi)有通道已經(jīng)就緒,也有辦法讓其從select方法返回。只要讓其它線(xiàn)程在阻塞線(xiàn)程調(diào)用select方法的對(duì)象上調(diào)用Selector.wakeup方法即可。阻塞在select方法上的線(xiàn)程會(huì)立馬返回。如果有其它線(xiàn)程調(diào)用了wakeup方法,但當(dāng)前沒(méi)有線(xiàn)程阻塞在select方法上,下個(gè)調(diào)用select方法的線(xiàn)程會(huì)立即wake up。

close

用完Selector后調(diào)用其close方法會(huì)關(guān)閉該Selector,且使注冊(cè)到該Selector上的所有SelectionKey實(shí)例無(wú)效。通道本身并不會(huì)關(guān)閉。

通過(guò)Selector選擇通道:

  • int select 阻塞直到至少有一個(gè)通道在你注冊(cè)的事件上就緒了
  • int select(long timeout) 增加最長(zhǎng)阻塞毫秒數(shù)
  • int selectNow 不會(huì)阻塞,不管什么通道就緒都立刻返回

三、 Tars NIO網(wǎng)絡(luò)編程

了解完 Java NIO的原理,我們來(lái)看看Tars是如何使用NIO進(jìn)行網(wǎng)絡(luò)編程的。

Tars的網(wǎng)絡(luò)模型是多reactor多線(xiàn)程模型。有一點(diǎn)特殊的是tars的reactor線(xiàn)程組里隨機(jī)選一個(gè)線(xiàn)程處理網(wǎng)絡(luò)事件,并且該線(xiàn)程同時(shí)也能處理讀寫(xiě)。

核心類(lèi)之間的關(guān)系如下:

3.1 一個(gè)典型的Java NIO服務(wù)端開(kāi)發(fā)流程

  1. 創(chuàng)建ServerSocketChannel,設(shè)置為非阻塞,并綁定端口
  2. 創(chuàng)建Selector對(duì)象
  3. 給ServerSocketChannel注冊(cè)SelectionKey.OP_ACCEPT事件
  4. 啟動(dòng)一個(gè)線(xiàn)程循環(huán),調(diào)用Selector的select方法來(lái)檢查IO就緒事件,一旦有IO就緒事件,就通知用戶(hù)線(xiàn)程去處理IO事件
  5. 如果有Accept事件,就創(chuàng)建一個(gè)SocketChannel,并注冊(cè)SelectionKey.OP_READ
  6. 如果有讀事件,判斷一下是否全包,如果全包,就交給后端線(xiàn)程處理
  7. 寫(xiě)事件比較特殊。isWriteable表示的是本機(jī)的寫(xiě)緩沖區(qū)是否可寫(xiě)。這個(gè)在絕大多少情況下都是為真的。在Netty中只有寫(xiě)半包的時(shí)候才需要注冊(cè)寫(xiě)事件,如果一次寫(xiě)就完全把數(shù)據(jù)寫(xiě)入了緩沖區(qū)就不需要注冊(cè)寫(xiě)事件。

3.2 Tars客戶(hù)端發(fā)起請(qǐng)求到服務(wù)器的流程

  1. Communicator.stringToProxy 根據(jù)servantName等配置信息創(chuàng)建通信器。
  2. ServantProxyFactory.getServantProxy 調(diào)用工廠(chǎng)方法創(chuàng)建servant代理。
  3. ObjectProxyFactory.getObjectProxy 調(diào)用工廠(chǎng)方法創(chuàng)建obj代理。
  4. TarsProtocolInvoker.create 創(chuàng)建協(xié)議調(diào)用者。
  5. ServantProtocolInvoker.initClient(Url url) 根據(jù)servantProxyConfig中的配置信息找到servant的ip端口等進(jìn)行初始化ServantClient。
  6. ClientPoolManager.getSelectorManager 如果第一次調(diào)用selectorManager是空的就會(huì)去初始化selectorManager。
  7. reactorSet = new Reactor[selectorPoolSize]; SelectorManager初始化構(gòu)造類(lèi)中的會(huì)根據(jù)selectorPoolSize(默認(rèn)是2)的配置創(chuàng)建Reactor線(xiàn)程數(shù)組。線(xiàn)程名稱(chēng)的前綴是servant-proxy-加上CommunicatorId,CommunicatorId生成規(guī)則是由locator的地址生成的UUID。
  8. 啟動(dòng)reactor線(xiàn)程。

3.3 Tars服務(wù)端啟動(dòng)步驟

  1. tars支持TCP和UDP兩種協(xié)議,RPC場(chǎng)景下是使用TCP協(xié)議。
  2. new SelectorManager 根據(jù)配置信息初始化selectorManager,線(xiàn)程池大小 processors > 8 ? 4 + (processors * 5 / 8) : processors + 1;線(xiàn)程名稱(chēng)前綴是server-tcp-reactor,然后啟動(dòng)reactor線(xiàn)程數(shù)組中的所有線(xiàn)程。
  3. 開(kāi)啟服務(wù)端監(jiān)聽(tīng)的ServerSocketChannel,綁定服務(wù)端本地ip和監(jiān)聽(tīng)的端口號(hào),設(shè)置TCP連接請(qǐng)求隊(duì)列的最大容量為1024;設(shè)置非阻塞模式。
  4. 選取reactor線(xiàn)程數(shù)組中第0個(gè)線(xiàn)程作為服務(wù)端監(jiān)聽(tīng)連接OP_ACCEPT就緒事件的線(xiàn)程。

代碼4:

publicvoidbind( AppService appService) throws IOException { // 此處略去非關(guān)鍵代碼 if(endpoint.type. equals( "tcp")) { // 1 this.selectorManager = newSelectorManager(Utils.getSelectorPoolSize, newServantProtocolFactory(codec), threadPool, processor, keepAlive, "server-tcp-reactor", false); // 2 this.selectorManager.setTcpNoDelay(serverCfg.isTcpNoDelay); this.selectorManager.start; ServerSocketChannel serverChannel = ServerSocketChannel.open; serverChannel.socket.bind( newInetSocketAddress(endpoint.host, endpoint.port), 1024); // 3 serverChannel.configureBlocking( false); selectorManager.getReactor( 0).registerChannel(serverChannel, SelectionKey.OP_ACCEPT); // 4 } elseif(endpoint.type. equals( "udp")) { this.selectorManager = newSelectorManager( 1, newServantProtocolFactory(codec), threadPool, processor, false, "server-udp-reactor", true); this.selectorManager.start; // UDP開(kāi)啟的是DatagramChannel DatagramChannel serverChannel = DatagramChannel.open; DatagramSocket socket = serverChannel.socket; socket.bind( newInetSocketAddress(endpoint.host, endpoint.port)); serverChannel.configureBlocking( false); // UDP協(xié)議不需要建連,監(jiān)聽(tīng)的是OP_READ就緒事件 this.selectorManager.getReactor( 0).registerChannel(serverChannel, SelectionKey.OP_READ); } }

3.4 Reactor線(xiàn)程啟動(dòng)流程

  1. 多路復(fù)用器開(kāi)始輪詢(xún)檢查 是否有就緒的事件。
  2. 處理register隊(duì)列中剩余的channel注冊(cè)到當(dāng)前reactor線(xiàn)程的多路復(fù)用器selector中。
  3. 獲取已選鍵集中所有就緒的channel。
  4. 更新Session中最近操作時(shí)間,Tars服務(wù)端啟動(dòng)時(shí)會(huì)調(diào)用 startSessionManager , 單線(xiàn)程每30s掃描一次session會(huì)話(huà)列表,會(huì)檢查每個(gè)session的 lastUpdateOperationTime 與當(dāng)前時(shí)間的時(shí)間差,如果超過(guò)60秒會(huì)將過(guò)期session對(duì)應(yīng)的channel踢除。
  5. 分發(fā)IO事件進(jìn)行處理。
  6. 處理unregister隊(duì)列中剩余的channel,從當(dāng)前reactor線(xiàn)程的多路復(fù)用器selector中解除注冊(cè)。

代碼5:

publicvoidrun( ) { while(!Thread.interrupted) { selector. select; // 1 processRegister; // 2 Iterator<SelectionKey> iter = selector.selectedKeys.iterator; // 3 while(iter.hasNext) { SelectionKey key = iter.next; iter. remove; if(!key.isValid) continue; try{ if(key.attachment != null&& key.attachment instanceof Session) { ((Session) key.attachment).updateLastOperationTime; //4 } dispatchEvent(key); // 5 } catch(Throwable ex) { disConnectWithException(key, ex); } } processUnRegister; // 6 } }

3.5 IO事件分發(fā)處理

每個(gè)reactor線(xiàn)程都有一個(gè)專(zhuān)門(mén)的Accepter類(lèi)去處理各種IO事件。TCPAccepter可以處理全部的四種事件(OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ)、UDPAccepter由于不需要建立連接所以只需要處理讀和寫(xiě)兩種事件。

1. 處理OP_ACCEPT

  1. 獲取channel,處理TCP請(qǐng)求。
  2. 為這個(gè)TCP請(qǐng)求創(chuàng)建TCPSession,會(huì)話(huà)的狀態(tài)是服務(wù)器已連接
  3. 會(huì)話(huà)注冊(cè)到sessionManager中,Tars服務(wù)可配置最大連接數(shù)maxconns,如果超過(guò)就會(huì)關(guān)閉當(dāng)前會(huì)話(huà)。
  4. 尋找下一個(gè)reactor線(xiàn)程進(jìn)行多路復(fù)用器與channel的綁定。

代碼6:

publicvoidhandleAcceptEvent(SelectionKey key)throwsIOException { ServerSocketChannel server = (ServerSocketChannel) key.channel; // 1 SocketChannel channel = server.accept; channel.socket.setTcpNoDelay(selectorManager.isTcpNoDelay); channel.configureBlocking( false); Utils.setQosFlag(channel.socket); TCPSession session = newTCPSession(selectorManager); // 2 session.setChannel(channel); session.setStatus(SessionStatus.SERVER_CONNECTED); session.setKeepAlive(selectorManager.isKeepAlive); session.setTcpNoDelay(selectorManager.isTcpNoDelay); SessionManager.getSessionManager.registerSession(session); // 3 selectorManager.nextReactor.registerChannel(channel, SelectionKey.OP_READ, session); // 4 }

2. 處理OP_CONNECT

  1. 獲取客戶(hù)端連接過(guò)來(lái)的channel通道
  2. 獲取Session
  3. 與服務(wù)器建立連接,將關(guān)注的興趣OPS設(shè)置為ready就緒事件,session中的狀態(tài)修改為客戶(hù)端已連接

代碼7:

publicvoidhandleConnectEvent(SelectionKey key)throwsIOException { SocketChannel client = (SocketChannel) key.channel; // 1 TCPSession session = (TCPSession) key.attachment; //2 if(session == null) thrownewRuntimeException( "The session is null when connecting to ..."); try{ // 3 client.finishConnect; key.interestOps(SelectionKey.OP_READ); session.setStatus(SessionStatus.CLIENT_CONNECTED); } finally{ session.finishConnect; } }

3.處理OP_WRITE、 處理OP_READ

調(diào)用session.read和session.doWrite 方法處理讀寫(xiě)事件

代碼8:

publicvoidhandleReadEvent(SelectionKey key)throwsIOException { TCPSession session = (TCPSession) key.attachment; if(session == null) thrownewRuntimeException( "The session is null when reading data..."); session.read; } publicvoidhandleWriteEvent(SelectionKey key)throwsIOException { TCPSession session = (TCPSession) key.attachment; if(session == null) thrownewRuntimeException( "The session is null when writing data..."); session.doWrite; }

3.6 seesion中網(wǎng)絡(luò)讀寫(xiě)的事件詳細(xì)處理過(guò)程

1. 讀事件處理

申請(qǐng)2k的ByteBuffer空間,讀取channel中的數(shù)據(jù)到readBuffer中。根據(jù)sessionStatus判斷是客戶(hù)端讀響應(yīng)還是服務(wù)器讀請(qǐng)求,分別進(jìn)行處理。

代碼9:

protectedvoid read throws IOException { int ret = readChannel; if( this.status == SessionStatus.CLIENT_CONNECTED) { readResponse; } elseif( this.status == SessionStatus.SERVER_CONNECTED) { readRequest; } else{ thrownew IllegalStateException( "The current session status is invalid. [status:"+ this.status + "]"); } if(ret < 0) { close; return; } } privateint readChannel throws IOException { int readBytes = 0, ret = 0; ByteBuffer data= ByteBuffer.allocate( 1024* 2); // 1 if(readBuffer == null) { readBuffer = IoBuffer.allocate(bufferSize); } // 2 while((ret = ((SocketChannel) channel).read( data)) > 0) { data.flip; // 3 readBytes += data.remaining; readBuffer.put( data.array, data.position, data.remaining); data.clear; } returnret < 0? ret : readBytes; }

① 客戶(hù)端讀響應(yīng)

從當(dāng)前readBuffer中的內(nèi)容復(fù)制到一個(gè)新的臨時(shí)buffer中,并且切換到讀模式,使用TarsCodec類(lèi)解析出buffer內(nèi)的協(xié)議字段到response,WorkThread線(xiàn)程通知Ticket處理response。如果response為空,則重置tempBuffer到mark的位置,重新解析協(xié)議。

代碼10:

publicvoidreadResponse( ) { Response response = null; IoBuffer tempBuffer = null; tempBuffer = readBuffer.duplicate.flip; while( true) { tempBuffer.mark; if(tempBuffer.remaining > 0) { response = selectorManager.getProtocolFactory.getDecoder.decodeResponse(tempBuffer, this); } else{ response = null; } if(response != null) { if(response.getTicketNumber == Ticket.DEFAULT_TICKET_NUMBER) response.setTicketNumber(response.getSession.hashCode); selectorManager.getThreadPool.execute( newWorkThread(response, selectorManager)); } else{ tempBuffer.reset; readBuffer = resetIoBuffer(tempBuffer); break; } } }

② 服務(wù)器讀請(qǐng)求

任務(wù)放入線(xiàn)程池交給 WorkThread線(xiàn)程,最終交給Processor類(lèi)出構(gòu)建請(qǐng)求的響應(yīng)體,包括分布式上下文,然后經(jīng)過(guò)FilterChain的處理,最終通過(guò)jdk提供的反射方法invoke服務(wù)端本地的方法然后返回response。如果線(xiàn)程池拋出拒絕異常,則返回SERVEROVERLOAD = -9,服務(wù)端過(guò)載保護(hù)。如果request為空,則重置tempBuffer到mark的位置,重新解析協(xié)議。

代碼11:

publicvoidreadRequest( ) { Request request = null; IoBuffer tempBuffer = readBuffer.duplicate.flip; while( true) { tempBuffer.mark; if(tempBuffer.remaining > 0) { request = selectorManager.getProtocolFactory.getDecoder.decodeRequest(tempBuffer, this); } else{ request = null; } if(request != null) { try{ request.resetBornTime; selectorManager.getThreadPool.execute( newWorkThread(request, selectorManager)); } catch(RejectedExecutionException e) { selectorManager.getProcessor.overload(request, request.getIOSession); } catch(Exception ex) { ex.printStackTrace; } } else{ tempBuffer.reset; readBuffer = resetIoBuffer(tempBuffer); break; } } }

2. 寫(xiě)事件處理

同樣也包括客戶(hù)端寫(xiě)請(qǐng)求和服務(wù)端寫(xiě)響應(yīng)兩種,其實(shí)這兩種都是往TCPSession中的LinkedBlockingQueue(有界隊(duì)列最大8K)中插入ByteBuffer。LinkedBlockingQueue中的ByteBuffer最終會(huì)由TCPAcceptor中的handleWriteEvent監(jiān)聽(tīng)寫(xiě)就緒事件并消費(fèi)。

代碼12:

protectedvoidwrite(IoBuffer buffer)throwsIOException { if(buffer == null) return; if(channel == null|| key == null) thrownewIOException( "Connection is closed"); if(! this.queue.offer(buffer.buf)) { thrownewIOException( "The session queue is full. [ queue size:"+ queue.size + " ]"); } if(key != null) { key.interestOps(key.interestOps | SelectionKey.OP_WRITE); key.selector.wakeup; } }

四、總結(jié)

本文主要介紹了Java NIO編程的基礎(chǔ)知識(shí) 和 Tars-Java 1.7.2版本的網(wǎng)絡(luò)編程模塊的源碼實(shí)現(xiàn)。

在最新的Tars-Java的master分支中我們可以發(fā)現(xiàn)網(wǎng)絡(luò)編程已經(jīng)由NIO改成了Netty,雖然Netty更加成熟穩(wěn)定,但是作為學(xué)習(xí)者了解NIO的原理也是掌握網(wǎng)絡(luò)編程的必經(jīng)之路。

更多關(guān)于Tars框架的介紹可以訪(fǎng)問(wèn):

https://tarscloud.org/

本文分析源碼地址(v1.7.x分支):

https://github.com/TarsCloud/TarsJava

2023 源創(chuàng)會(huì)線(xiàn)下重啟,基礎(chǔ)軟件技術(shù)面面談。

分享到:
標(biāo)簽:網(wǎng)絡(luò)編程
用戶(hù)無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定