public class SReactorSThread {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private int PORT = 6666;
public SReactorSThread() {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
//對客戶端進行監聽
public void listen() {
try {
while (true) {
int count = selector.select();
//表示有客戶端產生事件
if (count > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
dispatch(key);
iterator.remove();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void dispatch(SelectionKey key) {
if (key.isAcceptable()){
accept(key);
}else {
handler(key);
}
}
//建立新的連接
private void accept(SelectionKey key) {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
//對請求進行處理,接收消息---業務處理---返回消息
private void handler(SelectionKey key) {
SocketChannel channel = null;
try {
channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(3);
StringBuilder msg = new StringBuilder();
while (channel.read(buffer) > 0) {
msg.Append(new String(buffer.array()));
buffer.clear();
}
System.out.println("接收到消息:" + msg.toString());
String ok = "OK";
buffer.put(ok.getBytes());
buffer.flip();
channel.write(buffer);
buffer.clear();
} catch (IOException e) {
try {
System.out.println(channel.getRemoteAddress() + "離線了");
key.cancel();
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
public static void main(String[] args) {
SReactorSThread sReactorSThread = new SReactorSThread();
sReactorSThread.listen();
}