websokct 使用.NETty 實現(xiàn)
websocket 是一個全雙工的長連接通訊
話不多說直接上代碼。。
服務端的代碼實現(xiàn)
public void init() throws InterruptedException {
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup, workGroup).channel(NIOServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// http 編碼和解編碼 handler
pipeline.addLast(new HttpServerCodec());
// chunked 以塊的形式 handler
pipeline.addLast(new ChunkedWriteHandler());
// http 數(shù)據(jù)傳輸過程中是分段的, httpObjectAggregator 就可以給多個段做聚合操作,這就是瀏覽器發(fā)送大量數(shù)據(jù)時候,就會發(fā)出多次 http 請求
pipeline.addLast(new HttpObjectAggregator(8192));
// webSocket 是通過幀的形式傳遞的,可以看到 WebSocketFrame 六個子類,WebSocketServerProtocolHandler 他的主要功能就是可以把 http 協(xié)議升級為 ws 協(xié)議,保持長連接
pipeline.addLast(new WebSocketServerProtocolHandler("/msg"));
pipeline.addLast(new CustomWebSocketFrameTextHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(88).sync();
channelFuture.addListener(future -> {
if (future.isDone()) log.info("端口綁定成功");
});
channelFuture.channel().closeFuture().sync();
} finally {
boosGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
服務端的邏輯會用到自定義handler的處理器
CustomWebSocketFrameTextHandler 這個自定義處理器。
處理器的代碼
/**
* 這里的 {@link TextWebSocketFrame} 表示一個文本幀
*
* @author L
*/
class CustomWebSocketFrameTextHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
Logger log = LoggerUtils.getLogger(CustomWebSocketFrameTextHandler.class);
/**
* 當 web 客戶端連接以后就會觸發(fā)這個方法
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
// id 表示唯一的值信息,LongText 是唯一的。ShortText 不是唯一的
log.info("handlerAdded => {} 被調(diào)用", ctx.channel().id().asLongText());
log.info("handlerAdded => {} 被調(diào)用", ctx.channel().id().asShortText());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
String text = msg.text();
log.info("{}", text);
// 回復客戶端消息
ctx.writeAndFlush(new TextWebSocketFrame(text));
}
/**
* 斷開連接以后會觸發(fā)
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
log.info("handlerRemoved => {} 被調(diào)用", ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("發(fā)生異常 => {}", cause.getMessage());
}
}
服務端這就可以運行了。然后需要一個客戶端來測試??蛻舳诵枰苯泳帉懸粋€ html 的簡單的小頁面來測試。
let webSocket = null;
if (window.WebSocket) {
// 邏輯
webSocket = new WebSocket("ws://localhost:88/msg");
webSocket.onopen = () => {
let showPlace = document.getElementById("responseMessage");
showPlace.value = "連接開啟";
}
webSocket.onmessage = function (p) {
let showPlace = document.getElementById("responseMessage");
showPlace.value = showPlace.value + "rn" + p.data;
}
webSocket.onclose = () => {
let showPlace = document.getElementById("responseMessage");
showPlace.value = showPlace.value + "rn" + "連接關閉";
}
webSocket.onerror = (p) => {
let showPlace = document.getElementById("responseMessage");
showPlace.value = showPlace.value + "rn" + "連接錯誤 => " + p.type;
}
} else alert("當前瀏覽器不支持操作");
function send(msg) {
if (webSocket.OPEN) {
webSocket.send(msg);
}
}
這樣就實現(xiàn)了 netty的 websocket 服務