前言
redis 為何能支持高并發?
Redis底層采用NIO中的多路IO復用的機制,對多個不同的連接(TCP)實現IO復用,很好地支持高并發,并且能實現線程安全。
Redis官方沒有windows版本,只有linux版本。
NIO在不同的操作系統上實現的方式有所不同,在Windows操作系統使用select實現輪訓,而且還存在空輪訓的情況,效率非常低。時間復雜度是為O(n)。其次默認對輪訓的數據有一定限制,所以難于支持上萬的TCP連接。
在Linux操作系統采用epoll實現事件驅動回調,不會存在空輪訓的情況,只對活躍的socket連接實現主動回調,這樣在性能上有大大的提升,時間復雜度是為O(1)。
Windows 操作系統是沒有epoll,只有Linux系統才有epoll。
這就是為什么Nginx、redis都能夠非常好的支持高并發,最終都是Linux中的IO多路復用機制epoll。
阻塞和非阻塞
阻塞和非阻塞通常形容多線程間的相互影響。比如一個線程占用了臨界區資源,那么其它所有需要這個資源的線程就必須在這個臨界區中進行等待,等待會導致線程掛起。這種情況就是阻塞。此時,如果占用資源的線程一直不愿意釋放資源,那么其它所有阻塞在這個臨界區上的線程都不能工作。而非阻塞允許多個線程同時進入臨界區。
阻塞調用是指調用結果返回之前,當前線程會被掛起。調用線程只有在得到結果之后才會返回。
非阻塞調用指在不能立刻得到結果之前,該調用不會阻塞當前線程。
BIO NIO AIO 概念
BIO(blocking IO):就是傳統的 JAVA.io 包,它是基于流模型實現的,交互的方式是同步、阻塞方式,也就是說在讀入輸入流或者輸出流時,在讀寫動作完成之前,線程會一直阻塞在那里,它們之間的調用是可靠的線性順序。優點是代碼比較簡單、直觀;缺點是 IO 的效率和擴展性很低,容易成為應用性能瓶頸。
NIO(non-blocking IO) :Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,可以構建多路復用的、同步非阻塞 IO 程序,同時提供了更接近操作系統底層高性能的數據操作方式。
AIO(Asynchronous IO) :是 Java 1.7 之后引入的包,是 NIO 的升級版本,提供了異步非堵塞的 IO 操作方式,所以人們叫它 AIO(Asynchronous IO),異步 IO 是基于事件和回調機制實現的,也就是應用操作之后會直接返回,不會堵塞在那里,當后臺處理完成,操作系統會通知相應的線程進行后續的操作。
NIO 講解
我們知道,BIO是阻塞式IO,是面向于流傳輸也即是根據每個字節實現傳輸,效率比較低;而NIO是同步非阻塞式的,式面向于緩沖區的,它的亮點是IO多路復用。
我們可以這樣理解IO多路復用,多路可以指有多個不同的TCP連接,復用是一個線程來維護多個不同的IO操作。所以它的好處是占用CPU資源非常小,而且線程安全。
NIO核心組件
管道channel:數據傳輸都是經過管道的。channel都是統一注冊到Selector上的。
選擇器Selector:也可稱為多路復用器。可以在單線程的情況下維護多個Channel,也可以維護多個連接。

BIO 和 NIO 代碼演示
傳統的BIO阻塞式Socket過程:
先啟動一個Socket服務端,此時控制臺會輸出開始等待接收數據中...,并等待客戶端連接。
package com.nobody;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author Mr.nobody
* @Description
* @date 2020/7/4
*/
public class SocketTcpBIOServer {
private static byte[] bytes = new byte[1024];
public static void main(String[] args) {
try {
// 創建ServerSocket
final ServerSocket serverSocket = new ServerSocket();
// 綁定監聽端口號
serverSocket.bind(new InetSocketAddress(8080));
while (true) {
System.out.println("開始等待接收數據中...");
Socket accept = serverSocket.accept();
int read = 0;
read = accept.getInputStream().read(bytes);
String result = new String(bytes);
System.out.println("接收到數據:" + result);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.

再啟動一個Socket客戶端,先不進行輸入。
package com.nobody;
import java.io.IOException;
import java.net.*;
import java.util.Scanner;
/**
* @author Mr.nobody
* @Description
* @date 2020/7/4
*/
public class ClientTcpSocket {
public static void main(String[] args) {
Socket socket = new Socket();
try {
// 與服務端建立連接
SocketAddress socketAddress = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
socket.connect(socketAddress);
while (true) {
Scanner scanner = new Scanner(System.in);
socket.getOutputStream().write(scanner.next().getBytes());
}
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.
再啟動另外一個Socket客戶端02,輸入client02。
package com.nobody;
import java.io.IOException;
import java.net.*;
import java.util.Scanner;
/**
* @author Mr.nobody
* @Description
* @date 2020/7/4
*/
public class ClientTcpSocket02 {
public static void main(String[] args) {
Socket socket = new Socket();
try {
// 與服務端建立連接
SocketAddress socketAddress = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
socket.connect(socketAddress);
while (true) {
Scanner scanner = new Scanner(System.in);
socket.getOutputStream().write(scanner.next().getBytes());
}
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.

此時可以看到服務端沒有接收到數據,因為Socket客戶端01先連接,但是還未輸入數據,所以服務端一直等待客戶端01的輸入,導致客戶端02阻塞。
如果我們這時在客戶端01輸入client01,服務端控制臺顯示如下,先輸出客戶端01的數據,完成后才能輸出客戶端02的數據。

當然,如果不想后連接的客戶端不阻塞,可以使用多線程實現偽異步IO,只需將服務端代碼修改為如下:
public static void main(String[] args) {
try {
// 創建ServerSocket
final ServerSocket serverSocket = new ServerSocket();
// 綁定監聽端口號
serverSocket.bind(new InetSocketAddress(8080));
while (true) {
System.out.println("開始等待接收數據中...");
Socket accept = serverSocket.accept();
new Thread(new Runnable() {
@Override
public void run() {
int read = 0;
try {
read = accept.getInputStream().read(bytes);
} catch (IOException e) {
e.printStackTrace();
}
String result = new String(bytes);
System.out.println("接收到數據:" + result);
}
}).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.
當然上面代碼有個缺點是創建的線程會頻繁創建和銷毀,頻繁進行CPU調度,并且也消耗內存資源,可使用線程池機制優化。
NIO非阻塞式Socket過程:
前面兩個客戶端代碼不變,服務端代碼如下:
package com.nobody.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
/**
* @author Mr.nobody
* @Description
* @date 2020/7/4
*/
public class NioServer {
private Selector selector;
public void iniServer() {
try {
// 創建管道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 設置管道為非阻塞
serverSocketChannel.configureBlocking(false);
// 將管道綁定到8080端口
serverSocketChannel.bind(new InetSocketAddress(8080));
// 創建一個選擇器
this.selector = Selector.open();
// 將管道注冊到選擇器上,注冊為SelectionKey.OP_ACCEPT事件,
// 當事件到達后,selector.select()會返回,否則改方法會一直阻塞。
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
public void listen() throws IOException {
System.out.println("服務端啟動成功...");
// 輪詢訪問Selector
while (true) {
// 當事件到達后,selector.select()會返回,否則改方法會一直阻塞。
int select = selector.select(10);
// 沒有發送消息,跳過
if (0 == select) {
continue;
}
// selector中選中的注冊事件
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 刪除已選中的key,避免重復處理
iterator.remove();
if (key.isAcceptable()) { // 客戶端連接事件
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 獲得與客戶端連接的管道
SocketChannel socketChannel = server.accept();
// 設置管道為非阻塞
socketChannel.configureBlocking(false);
// 與客戶端連接后,為了能接收到客戶端的消息,為管道設置可讀權限
socketChannel.register(this.selector, SelectionKey.OP_READ);
} else if (key.isReadable()) { // 可讀事件
// 創建讀取數據的緩沖區
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
SocketChannel channel = (SocketChannel) key.channel();
channel.read(byteBuffer);
byte[] bytes = byteBuffer.array();
String msg = new String(bytes).trim();
System.out.println("服務端收到消息:" + msg);
ByteBuffer outByteBuffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
// 回應消息給客戶端
channel.write(outByteBuffer);
}
}
}
}
public static void main(String[] args) throws IOException {
NioServer nioServer = new NioServer();
nioServer.iniServer();
nioServer.listen();
}
}
1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.
啟動服務端,然后再啟動兩個客戶端,兩個客戶端都不會阻塞。
