A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

Netty 权威指南笔记(二):Java NIO 和 Netty 对比

Netty 是业界流行的 NIO 框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都说首屈一指的,也已经得到了成百上千商用项目的验证。Netty 框架都有什么优点呢?

API 使用简单,开发门槛低。
功能强大,预置多种编解码功能,支持多种主流协议。
定制能力强,可以通过 ChannelHandler 对通信框架灵活扩展。
性能高。
成熟稳定,社区活跃,已经修复了 Java NIO 所有的 Bug。
经历了大规模商业应用的考验,质量有保证。
Java NIO 开发

我们看一看在 笔记(一)里面的 Java NIO 示例程序,回顾一下使用 NIO 开发服务端程序的步骤:

创建 ServerSocketChannel 和业务处理线程池。
绑定监听端口,并配置为非阻塞模式。
创建 Selector,将之前创建的 ServerSocketChannel 注册到 Selector 上,监听 SelectionKey.OP_ACCEPT。
循环执行 Selector.select() 方法,轮询就绪的 Channel。
轮询就绪的 Channel 时,如果是处于 OP_ACCEPT 状态,说明是新的客户端接入,调用 ServerSocketChannel.accept 接收新的客户端。
设置新接入的 SocketChannel 为非阻塞模式,并注册到 Selector 上,监听 OP_READ。
如果轮询的 Channel 状态是 OP_READ,说明有新的就绪数据包需要读取,则构造 ByteBuffer 对象,读取数据。
Netty 开发

在讲 Netty 优点的时候,说到“Netty API 使用简单,开发门槛低”。先看看下面的 Netty TimeServer 示例程序,看看 Netty 开发都有哪些步骤?

创建 NIO 线程组 EventLoopGroup 和 ServerBootstrap。
设置 ServerBootstrap 的属性:线程组、SO_BACKLOG 选项,设置 NioServerSocketChannel 为 Channel,设置业务处理 Handler。
绑定端口,启动服务器程序。
在业务处理 TimeServerHandler 中,读取客户端发送的数据,并给出响应。
那么相比 Java NIO,使用 Netty 开发程序,都简化了哪些步骤呢?

OP_ACCEPT 的处理被简化,因为对于 accept 操作的处理在不同业务上都是一致的。
在 NIO 中需要自己构建 ByteBuffer 从 Channel 中读取数据,而 Netty 中数据是直接读取完成存放在 ByteBuf 中的。相当于省略了用户进程从内核中复制数据的过程。
在 Netty 中,我们看到有使用一个解码器 FixedLengthFrameDecoder,可以用于处理定长消息的问题,能够解决 TCP 粘包读半包问题,十分方便。
示例程序

服务器端程序

public class TimeServer {
    public void bind(int port) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 2014)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(16));
                            socketChannel.pipeline().addLast(new TimeServerHandler());
                        }
                    });

            ChannelFuture future = bootstrap.bind(port).sync();

            System.out.println("start listening ...");

            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int port = 11000;
        new TimeServer().bind(port);
    }
}


TimeServerHandler 业务处理类:

public class TimeServerHandler extends ChannelHandlerAdapter {
    private int counter = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;

        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);

        String body = new String(req, "UTF-8");
        System.out.println("time server receive: " + body + ", counter: " + counter++);

        String currentTime = "QUERY TIME ORDER".equals(body) ? new Date().toString() : "BADY ORDER";

        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
    }

    /**
     * 读完一批次数据,就会调用一次 channelReadComplete
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete");
        // flush 之后,之前 write 的数据,才会发送出去
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("exceptionCaught");
        cause.printStackTrace();
        ctx.close();
    }
}

客户端程序

public class TimeClient {

    public void connect(int port, String host) throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            List<VirtualMachineDescriptor> list = VirtualMachine.list();
            for (VirtualMachineDescriptor descriptor : list) {
                System.out.println(descriptor.displayName());
            }

            System.out.println("vm: " + list.size());

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(28));
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });

            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();

            channelFuture.channel().closeFuture().sync();

            System.out.println("end listening ...");
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int port = 11000;
        new TimeClient().connect(port, "127.0.0.1");
    }
}

TimeClientHandler 客户端业务处理类:

public class TimeClientHandler extends ChannelHandlerAdapter {

    private final byte[] req;
    private int counter = 0;

    public TimeClientHandler() {
        System.out.println("init TimeClientHandler");
        req = "QUERY TIME ORDER".getBytes();

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 100; ++i) {
            ByteBuf firstMessage = Unpooled.buffer(req.length);
            firstMessage.writeBytes(req);
            ctx.writeAndFlush(firstMessage);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead");

        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);

        System.out.println("now is: " + new String(req, "UTF-8") + ", counter: " + counter++);

        if (counter > 99) {
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("exceptionCaught");
        ctx.close();
    }

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马