服务端 监听
package com.hrd.netty.demo.jnio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* Created by hurd on 2016/1/26.
*/
public class NioServerListener extends Thread {
protected Selector selector;
protected SocketChannel socketChannel;
public NioServerListener(Selector selector){
this.selector = selector;
}
@Override
public void run(){
try {
//while循环监听事件
while(true){
//阻塞
selector.select();
//获取选择器中已经就绪的SelectionKey集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
//遍历
while (iterator.hasNext()){
SelectionKey key = iterator.next();
//删除
iterator.remove();
//接受连接就绪事件
if(key.isAcceptable()){
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
socketChannel = ssc.accept();
//套接字通道设置为非阻塞模式
socketChannel.configureBlocking(false);
//向socket通道 注册读就绪事件
socketChannel.register(selector, SelectionKey.OP_READ);
}else if(key.isReadable()){
//SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
int len = socketChannel.read(byteBuffer);
/*StringBuffer dataBuffer= new StringBuffer();
while( -1 != len){
//将写模式变为读模式
byteBuffer.flip();
CharBuffer charBuffer = byteBuffer.asCharBuffer();
dataBuffer.append(charBuffer.array());
byteBuffer.clear();
len = socketChannel.read(byteBuffer);
}*/
byteBuffer.flip();
//读取完毕
if(byteBuffer.limit()>0){
System.out.println("来自服务端消息:" + new String(byteBuffer.array()).trim());
}
}else if(key.isWritable()){
//暂时还没想明白这个写就绪事件干嘛用的。。。
System.out.println("暂不处理");
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
//打开一个 ServerSocketChannel实例 并设置为false
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
//绑定ip +端口
serverSocketChannel.socket().bind(new InetSocketAddress("127.0.0.1",1024));
//打开一个选择器
Selector selector = Selector.open();
//向 serverSocketChannel 注册 接收就绪事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//开启线程进行监听
NioServerListener listener = new NioServerListener (selector);
listener.start();
//进行控制太输入 写事件 进行通讯
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
while(true){
//java IO 阻塞读取数据
String data = bufferedReader.readLine();
if("exit".equals(data)){
if(null != listener.socketChannel){
listener.socketChannel.close();
}
System.out.println("主线程关闭.....");
System.exit(0);
}
ByteBuffer buffer = ByteBuffer.wrap(data.getBytes());
listener.socketChannel.write(buffer);
}
}
}
客户端 监听
package com.hrd.netty.demo.jnio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* Created by hurd on 2016/1/26.
*/
public class NioClientListener extends Thread {
public Selector selector;
public SocketChannel socketChannel;
public NioClientListener(Selector selector){
this.selector = selector;
}
@Override
public void run(){
try {
//while循环监听事件
while(true){
//阻塞
selector.select();
//获取选择器中已经就绪的SelectionKey集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
//遍历
while (iterator.hasNext()){
SelectionKey key = iterator.next();
//删除
iterator.remove();
socketChannel = (SocketChannel) key.channel();
//套接字通道设置为非阻塞模式
socketChannel.configureBlocking(false);
//连接就绪事件
if(key.isConnectable()){
socketChannel = (SocketChannel) key.channel();
//判断连接是否完成
int i =0;
while(! socketChannel.finishConnect()){
if(++i>10){
throw new RuntimeException("socket连接超时");
}
System.out.println("sock连接未完成,等待中....");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//向socket通道 注册读就绪事件
socketChannel.register(selector, SelectionKey.OP_READ);
}else if(key.isReadable()){
//SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
int len = socketChannel.read(byteBuffer);
/*StringBuffer dataBuffer= new StringBuffer();
while( -1 != len){
//将写模式变为读模式
byteBuffer.flip();
CharBuffer charBuffer = byteBuffer.asCharBuffer();
dataBuffer.append(charBuffer.array());
byteBuffer.clear();
len = socketChannel.read(byteBuffer);
}*/
byteBuffer.flip();
//读取完毕
if(byteBuffer.limit()>0){
System.out.println("来自服务端消息:" + new String(byteBuffer.array()).trim());
}
}else if(key.isWritable()){
//暂时还没想明白这个写就绪事件干嘛用的。。。
System.out.println("暂不处理");
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
//打开一个SocketChannel实例 并设置为false
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
//绑定ip +端口
socketChannel.connect(new InetSocketAddress("127.0.0.1",1024));
//打开一个选择器
Selector selector = Selector.open();
//向 serverSocketChannel 注册 连接就绪事件
socketChannel.register(selector, SelectionKey.OP_CONNECT);
//开启线程进行监听
NioClientListener listener = new NioClientListener (selector);
listener.start();
//进行控制太输入 写事件 进行通讯
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
while(true){
//java IO 阻塞读取数据
String data = bufferedReader.readLine();
if("exit".equals(data)){
socketChannel.close();
System.out.println("主线程关闭.....");
System.exit(0);
}
ByteBuffer buffer = ByteBuffer.wrap(data.getBytes());
socketChannel.write(buffer);
}
}
} |
|