A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

Apache Mina是一个NIO库,实现非阻塞式通讯。它通过Java nio技术基于TCP/IP和UDP/IP协议提供了抽象的、事件驱动的、异步的API。

使用步骤:

1、配置Maven

2、配置spring

3、实现编码工厂ProtocolCodecFactory

4、继承编码器适配器ProtocolEncoderAdapter

5、实现解码器ProtocolDecoder

6、继承处理器IoHandlerAdapter

7、实现心跳工厂KeepAliveMessageFactory

8、客户端启动

简单介绍:

1、Mina工作流程:

2、3个重要元素

3、IoSession常用方法

4、IoHandler 常用方法

1、配置Maven

<dependency>
    <groupId>org.apache.mina</groupId>
    <artifactId>mina-integration-beans</artifactId>
    <version>2.0.16</version>
</dependency>
<dependency>
    <groupId>org.apache.mina</groupId>
    <artifactId>mina-core</artifactId>
    <version>2.0.16</version>
</dependency>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.felix</groupId>
            <artifactId>maven-bundle-plugin</artifactId>
            <extensions>true</extensions>
        </plugin>
    </plugins>
</build>
2、配置spring

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
    default-lazy-init="false">

    <bean class="org.springframework.beans.factory.config.CustomEditorConfigurer">
        <property name="customEditors">
            <map>
                <entry key="java.net.SocketAddress"
                    value="org.apache.mina.integration.beans.InetSocketAddressEditor"></entry>
            </map>
        </property>
    </bean>

    <bean id="ioAcceptor" class="org.apache.mina.transport.socket.nio.NioSocketAcceptor"
        init-method="bind" destroy-method="unbind">
        <!--端口号 -->
        <property name="defaultLocalAddress" value=":8888"></property>
        <!--绑定自己实现的handler -->
        <property name="handler" ref="serverHandler"></property>
        <!--声明过滤器的集合 -->
        <property name="filterChainBuilder" ref="filterChainBuilder"></property>
        <property name="reuseAddress" value="true" />
    </bean>

    <bean id="filterChainBuilder"
        class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder">
        <property name="filters">
            <map>
                <!--mina自带的线程池filter -->
                <entry key="executor" value-ref="executorFilter"></entry>
                <entry key="mdcInjectionFilter" value-ref="mdcInjectionFilter" />
                <!--自己实现的编解码器filter -->
                <entry key="codecFilter" value-ref="codecFilter" />
                <!--日志的filter -->
                <entry key="loggingFilter" value-ref="loggingFilter" />
                <!--心跳filter -->
                <entry key="keepAliveFilter" value-ref="keepAliveFilter" />
            </map>
        </property>
    </bean>

    <!-- executorFilter多线程处理 -->
    <bean id="executorFilter" class="org.apache.mina.filter.executor.ExecutorFilter" />
    <bean id="mdcInjectionFilter" class="org.apache.mina.filter.logging.MdcInjectionFilter">
        <constructor-arg value="remoteAddress" />
    </bean>

    <!--日志 -->
    <bean id="loggingFilter" class="org.apache.mina.filter.logging.LoggingFilter" />

    <!--编解码 -->
    <bean id="codecFilter" class="org.apache.mina.filter.codec.ProtocolCodecFilter">
        <constructor-arg>
            <!--构造函数的参数传入自己实现的对象 -->
            <bean class="com.onion.mina.server.NSMinaCodeFactory"></bean>
        </constructor-arg>
    </bean>

    <!--心跳检测filter -->
    <bean id="keepAliveFilter" class="org.apache.mina.filter.keepalive.KeepAliveFilter">
        <!--构造函数的第一个参数传入自己实现的工厂 -->
        <constructor-arg>
            <bean class="com.onion.mina.server.NSMinaKeepAliveMessageFactory"></bean>
        </constructor-arg>
        <!--第二个参数需要的是IdleStatus对象,value值设置为读写空闲 -->
        <constructor-arg type="org.apache.mina.core.session.IdleStatus"
            value="BOTH_IDLE">
        </constructor-arg>
        <!--心跳频率,不设置则默认5 -->
        <property name="requestInterval" value="1500" />
        <!--心跳超时时间,不设置则默认30s -->
        <property name="requestTimeout" value="30" />
        <!--默认false,比如在心跳频率为5s时,实际上每5s会触发一次KeepAliveFilter中的session_idle事件,
        该事件中开始发送心跳包。当此参数设置为false时,对于session_idle事件不再传递给其他filter,如果设置为true,
        则会传递给其他filter,例如handler中的session_idle事件,此时也会被触发-->
        <property name="forwardEvent" value="true" />
    </bean>

    <!--自己实现的handler-->
    <bean id="serverHandler" class="com.onion.mina.server.NSMinaHandler" />
</beans>
3、实现编码工厂ProtocolCodecFactory

public class NSMinaCodeFactory implements ProtocolCodecFactory {
    private final NSProtocalEncoder encoder;
    private final NSProtocalDecoder decoder;

    public NSMinaCodeFactory() {
        this(Charset.forName("utf-8"));
    }

    public NSMinaCodeFactory(Charset charset) {
        encoder = new NSProtocalEncoder();
        decoder = new NSProtocalDecoder();
    }

    public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {
        // TODO Auto-generated method stub
        return decoder;
    }

    public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {
        // TODO Auto-generated method stub
        return encoder;
    }
}
4、继承编码器适配器ProtocolEncoderAdapter

public class NSProtocalEncoder extends ProtocolEncoderAdapter {
    private static final Logger logger = Logger.getLogger(NSProtocalEncoder.class);

    @SuppressWarnings("unused")
    private final Charset charset = Charset.forName("GBK");


    /**
     * 在此处实现包的编码工作,并把它写入输出流中
     */
    public void encode(IoSession session, Object message,
            ProtocolEncoderOutput out) throws Exception {
        // TODO Auto-generated method stub
        if(message instanceof BaseMessageForClient){
            BaseMessageForClient clientmessage = (BaseMessageForClient)message;
            byte[] packhead_arr = clientmessage.getPackHead().getBytes(charset);//包头2个字节
            byte[] length_arr = ByteTools.intToByteArray(clientmessage.getLength()+19, 2);//包长
            byte[] funcid_arr = ByteTools.intToByteArray(clientmessage.getFuncid(), 1);//协议类型
            byte[] packetIdCode_arr = ByteTools.longToByteArray(clientmessage.getPacketIdCode(), 8);//数据包标识码
            byte[] content_arr = clientmessage.getContent().getBytes(charset);//内容
            byte[] checkcode_arr = ByteTools.longToByteArray(clientmessage.getCheckCode(), 4);//校验码
            byte[] packtail_arr = clientmessage.getPackTail().getBytes();//包尾
            IoBuffer buffer = IoBuffer.allocate(packhead_arr.length + length_arr.length + funcid_arr.length + packetIdCode_arr.length+ content_arr.length  + checkcode_arr.length + packtail_arr.length);
            buffer.setAutoExpand(true);
            buffer.put(packhead_arr);
            buffer.put(length_arr);
            buffer.put(funcid_arr);
            buffer.put(packetIdCode_arr);
            buffer.put(content_arr);
            buffer.put(checkcode_arr);
            buffer.put(packtail_arr);
            buffer.flip();
            out.write(buffer);
            out.flush();

            buffer.free();
        }else{
            String value = (String)message;
            logger.warn("encode message:" + message);
            IoBuffer buffer = IoBuffer.allocate(value.getBytes().length);
            buffer.setAutoExpand(true);
            if(value != null){
                buffer.put(value.trim().getBytes());
            }
            buffer.flip();
            out.write(buffer);
            out.flush();

            buffer.free();
        }
    }
}
5、实现解码器ProtocolDecoder

public class NSProtocalDecoder implements ProtocolDecoder {

    private static final Logger logger = Logger.getLogger(NSProtocalDecoder.class);
    private final AttributeKey context = new AttributeKey(getClass(), "context");
    private final Charset charset = Charset.forName("GBK");
    private final String PACK_HEAD = "$$"; //包头
    private final String PACK_TAIL = "\r\n";  //包尾

    // 请求报文的最大长度 100k
    private int maxPackLength = 102400;

    public int getMaxPackLength() {
        return maxPackLength;
    }

    public void setMaxPackLength(int maxPackLength) {
        if (maxPackLength <= 0) {
            throw new IllegalArgumentException("请求报文最大长度:" + maxPackLength);
        }
        this.maxPackLength = maxPackLength;
    }

    private Context getContext(IoSession session) {
        Context ctx;
        ctx = (Context) session.getAttribute(context);
        if (ctx == null) {
            ctx = new Context();
            session.setAttribute(context, ctx);
        }
        return ctx;
    }

    public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
            throws Exception {
        Long start = System.currentTimeMillis();
        // 报文前缀长度 包头2个字节,包长2个字节,协议类型2个字节,数据包标识码8个字节,校验码4个字节,包尾2个字节
        final int packHeadLength = 19;
        // 先获取上次的处理上下文,其中可能有未处理完的数据
        Context ctx = getContext(session);
        // 先把当前buffer中的数据追加到Context的buffer当中
        ctx.append(in);
        // 把position指向0位置,把limit指向原来的position位置
        IoBuffer buf = ctx.getBuffer();
        buf.flip();
        // 然后按数据包的协议进行读取
        while (buf.remaining() >= packHeadLength) {
            logger.debug("test 长度1:" + buf.remaining());
            buf.mark();
            // 读取包头 2个字节
            String packhead = new String(new byte[]{buf.get(),buf.get()});
            logger.debug("包头:" + packhead);
            if(PACK_HEAD.equals(packhead)){
                //读取包的长度 2个字节 报文的长度,不包含包头和包尾
                byte[] length_byte = new byte[]{buf.get(),buf.get()};
                byte[] length_byte_arr = new byte[]{0,0,0,0};
                length_byte_arr[2] = length_byte[0];
                length_byte_arr[3] = length_byte[1];
                int length = ByteTools.byteArrayToInt(length_byte_arr);
                logger.debug("长度:" + length);
                logger.debug("test 长度1:" + buf.remaining());
                // 检查读取是否正常,不正常的话清空buffer
                if (length < 0 || length > maxPackLength) {
                    logger.debug("报文长度[" + length + "] 超过最大长度:" + maxPackLength
                            + "或者小于0,清空buffer");
                    buf.clear();
                    break;
                    //packHeadLength - 2 :减去包尾的长度,
                    //length - 2 <= buf.remaining() :代表length-本身长度占用的两个字节-包头长度
                }else if(length >= packHeadLength && length - 4 <= buf.remaining()){
                    //读取协议类型2个字节
                    byte[] funcid_byte = new byte[]{buf.get()};
                    byte[] funcid_byte_arr = new byte[]{0,0,0,0};
                    //funcid_byte_arr[2] = funcid_byte[0];
                    funcid_byte_arr[3] = funcid_byte[0];
                    int funcid = ByteTools.byteArrayToInt(funcid_byte_arr);
                    logger.warn("协议类型:" + funcid);
                    //读取数据包标识码8个字节
                    byte[] packetIdCode_byte = new byte[]{buf.get(),buf.get(),buf.get(),buf.get(),buf.get(),buf.get(),buf.get(),buf.get()};
                    long packetIdCode = ByteTools.byteArrayToLong(packetIdCode_byte);
                    logger.debug("数据包标识码:" + packetIdCode);

                    //读取报文正文内容
                    int oldLimit = buf.limit();
                    logger.debug("limit:" + (buf.position() + length));
                    //当前读取的位置 + 总长度  - 前面读取的字节长度 - 校验码
                    buf.limit(buf.position() + length - 19);
                    String content = buf.getString(ctx.getDecoder());
                    buf.limit(oldLimit);
                    logger.debug("报文正文内容:" + content);
                    CRC32 crc = new CRC32();
                    crc.update(content.getBytes("GBK"));

                    //读取校验码 4个字节
                    byte[] checkcode_byte = new byte[]{buf.get(),buf.get(),buf.get(),buf.get()};
                    byte[] checkcode_byte_arr = new byte[]{0,0,0,0,0,0,0,0};
                    checkcode_byte_arr[4] = checkcode_byte[0];
                    checkcode_byte_arr[5] = checkcode_byte[1];
                    checkcode_byte_arr[6] = checkcode_byte[2];
                    checkcode_byte_arr[7] = checkcode_byte[3];
                    long checkcode = ByteTools.byteArrayToLong(checkcode_byte_arr);
                    logger.debug("校验码:" + checkcode);
                    //验证校验码
                    if(checkcode != crc.getValue()){
                        // 如果消息包不完整,将指针重新移动消息头的起始位置
                        buf.reset();
                        break;
                    }
                    //读取包尾 2个字节
                    String packtail = new String(new byte[]{buf.get(),buf.get()});
                    logger.debug("包尾:" + packtail);
                    if(!PACK_TAIL.equals(packtail)){
                        // 如果消息包不完整,将指针重新移动消息头的起始位置
                        buf.reset();
                        break;
                    }
                    BaseMessageForServer message = new BaseMessageForServer();
                    message.setLength(length);
                    message.setCheckCode(checkcode);
                    message.setFuncid(funcid);
                    message.setPacketIdCode(packetIdCode);
                    message.setContent(content);
                    out.write(message);
                }else{
                    // 如果消息包不完整,将指针重新移动消息头的起始位置
                    buf.reset();
                    break;
                }
            }else{
                // 如果消息包不完整,将指针重新移动消息头的起始位置
                buf.reset();
                break;
            }
        }
        if (buf.hasRemaining()) {
            // 将数据移到buffer的最前面
            IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);
            temp.put(buf);
            temp.flip();
            buf.clear();
            buf.put(temp);
        } else {// 如果数据已经处理完毕,进行清空
            buf.clear();
        }
    }

    public void finishDecode(IoSession session, ProtocolDecoderOutput out)
            throws Exception {
        // TODO Auto-generated method stub

    }

    public void dispose(IoSession session) throws Exception {
        // TODO Auto-generated method stub

    }

    // 记录上下文,因为数据触发没有规模,很可能只收到数据包的一半
    // 所以,需要上下文拼起来才能完整的处理
    private class Context {
        private final CharsetDecoder decoder;
        private IoBuffer buf;
        private int matchCount = 0;
        private int overflowPosition = 0;

        private Context() {
            decoder = charset.newDecoder();
            buf = IoBuffer.allocate(3000).setAutoExpand(true);
        }

        public CharsetDecoder getDecoder() {
            return decoder;
        }

        public IoBuffer getBuffer() {
            return buf;
        }

        @SuppressWarnings("unused")
        public int getOverflowPosition() {
            return overflowPosition;
        }

        @SuppressWarnings("unused")
        public int getMatchCount() {
            return matchCount;
        }

        @SuppressWarnings("unused")
        public void setMatchCount(int matchCount) {
            this.matchCount = matchCount;
        }

        @SuppressWarnings("unused")
        public void reset() {
            overflowPosition = 0;
            matchCount = 0;
            decoder.reset();
        }

        public void append(IoBuffer in) {
            getBuffer().put(in);
        }
    }

}
6、继承处理器IoHandlerAdapter

public class NSMinaHandler extends IoHandlerAdapter {

    private final Logger logger = Logger.getLogger(NSMinaHandler.class);
    public static ConcurrentHashMap<Long, IoSession> sessionHashMap = new ConcurrentHashMap<Long, IoSession>();

    @Override
    public void exceptionCaught(IoSession session, Throwable cause)
            throws Exception {
        session.closeOnFlush();
        logger.error("session occured exception, so close it."
                + cause.getMessage());
    }

    @Override
    public void messageReceived(IoSession session, Object message)
            throws Exception {
        BaseMessageForServer basemessage = (BaseMessageForServer) message;
        logger.debug("客户端"
                + ((InetSocketAddress) session.getRemoteAddress()).getAddress()
                        .getHostAddress() + "连接成功!");
        session.setAttribute("type", message);
        String remoteAddress = ((InetSocketAddress) session.getRemoteAddress())
                .getAddress().getHostAddress();
        session.setAttribute("ip", remoteAddress);
        // 组装消息内容,返回给客户端
        BaseMessageForClient messageForClient = new BaseMessageForClient();
        messageForClient.setFuncid(2);
        if (basemessage.getContent().indexOf("hello") > 0) {
            // 内容
            messageForClient.setContent("hello,我收到您的消息了! ");
        } else {
            // 内容
            messageForClient.setContent("恭喜,您已经入门! ");
        }
        // 校验码生成
        CRC32 crc32 = new CRC32();
        crc32.update(messageForClient.getContent().getBytes());
        // crc校验码
        messageForClient.setCheckCode(crc32.getValue());
        // 长度
        messageForClient
                .setLength(messageForClient.getContent().getBytes().length);

        // 数据包标识码
        messageForClient.setPacketIdCode(basemessage.getPacketIdCode());
        session.write(messageForClient);
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        logger.debug("messageSent:" + message);
    }

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        logger.debug("remote client [" + session.getRemoteAddress().toString()
                + "] connected.");
        Long time = System.currentTimeMillis();
        session.setAttribute("id", time);
        sessionHashMap.put(time, session);
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        logger.debug("sessionClosed");
        session.closeOnFlush();
        sessionHashMap.remove(session.getAttribute("id"));
    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus status)
            throws Exception {
        logger.debug("session idle, so disconnecting......");
        session.closeOnFlush();
        logger.warn("disconnected");
    }

    @Override
    public void sessionOpened(IoSession session) throws Exception {
        logger.debug("sessionOpened.");
    }

}
7、实现心跳工厂KeepAliveMessageFactory

public class NSMinaKeepAliveMessageFactory implements KeepAliveMessageFactory {

    private final Logger logger = Logger
            .getLogger(NSMinaKeepAliveMessageFactory.class);

    private BaseMessageForServer basemessage;
    /** 心跳包内容 */

    private static long packetIdCode = 0;

    /**
     * 判断是否心跳请求包 是的话返回true
     */
    public boolean isRequest(IoSession session, Object message) {
        // TODO Auto-generated method stub
        if (message instanceof BaseMessageForServer) {
            basemessage = (BaseMessageForServer) message;
            // 心跳包方法协议类型
            if (basemessage.getFuncid() == 3) {
                // 为3,代表是一个心跳包,
                packetIdCode = basemessage.getPacketIdCode();

                return true;
            } else {
                return false;
            }
        } else {
            return false;
        }
    }

    /**
     * 由于被动型心跳机制,没有请求当然也就不关注反馈 因此直接返回false
     */
    public boolean isResponse(IoSession session, Object message) {
        // TODO Auto-generated method stub
        return false;
    }

    /**
     * 被动型心跳机制无请求 因此直接返回nul
     */
    public Object getRequest(IoSession session) {
        // TODO Auto-generated method stub
        return null;
    }

    /**
     * 根据心跳请求request 反回一个心跳反馈消息
     */
    public Object getResponse(IoSession session, Object request) {
        // 组装消息内容,返回给客户端
        BaseMessageForClient messageForClient = new BaseMessageForClient();
        messageForClient.setFuncid(4);
        // 内容
        messageForClient.setContent("2222");
        // 校验码生成
        CRC32 crc32 = new CRC32();
        crc32.update(messageForClient.getContent().getBytes());
        // crc校验码
        messageForClient.setCheckCode(crc32.getValue());
        // 长度
        messageForClient
                .setLength(messageForClient.getContent().getBytes().length);

        // 数据包标识码
        messageForClient.setPacketIdCode(packetIdCode);
        return messageForClient;
    }

}
8、客户端启动

         NioSocketConnector connector = new NioSocketConnector();
        //添加过滤器
        connector.getFilterChain().addLast("logger", new LoggingFilter());
        //设置编码,解码过滤器
        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory()));
        //connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8"))));//设置编码过滤器
        connector.setHandler(new ClientHandler());//设置事件处理器
        ConnectFuture cf = connector.connect(new InetSocketAddress("127.0.0.1",8888)); //建立连接
        cf.awaitUninterruptibly(); //等待连接创建完成
        BaseMessageForServer message = new BaseMessageForServer();
        String content = "hello world!";
        CRC32 crc = new CRC32();
        try {
            crc.update(content.getBytes("GBK"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        message.setFuncid(5);
        message.setPacketIdCode(10000);
        message.setContent(content);
        message.setCheckCode(crc.getValue());
        message.setLength(content.getBytes().length);
        cf.getSession().write(message);
Mina工作流程:

当远程客户机首次访问采用Mina编写的程序时,IOAcceptor作为线程运行,负责接收来自客户的请求。当有客户请求连接时,创建一个IoSession,该IoSession与IoProcessor,SocketChannel与IoService联系起来。IoProcessor作为另一个线程运行,定时检查客户是否有数据到来,并对客户请求进行处理,一次调用IoService中注册的各个Filter,最后调用
IoHandler进行最终的逻辑处理,在将结果过滤后返回给客户端。

3个重要元素:

IoService(session:创建服务对象)<-->IoFilterChain(编码解码等)<-->IoHandler(业务处理)

IoSerivce为顶层接口,常用类:NioSocketAcceptor, NioSocketConnector

IoSession常用方法:
(1)setAttribute(Object key, Object value) getAttribute(Object key)
功能说明:设置/获取用户定义的属性,与web应用中的session类方法setAttribute,getAttribute方法类似
(2)getRemoteAddress()
功能说明:获取远程客户端地址。
(3)getId()
功能说明:获取Session的Id
(4)getCreationTime()
功能说明:获得创建时间
(5)getLastIoTime()
功能说明:获得上次IO时间
(6) getConfig()
功能说明:获得配置信息。
(7)write(Object message)
功能说明:将数据发送给客户端。
(8)close()
功能说明:关闭Session。
(说明:可以在Session中发送数据,但是Session没有提供读取数据的方法,读取数据通过解码器中的decode或者IoHandler中messageReceived方法)

IoHandler 常用方法:
(1)void exceptionCaught(IoSession session, Throwable cause)
功能说明:有异常发生时被触发。
(2)void messageReceived(IoSession session, Object message)
功能说明:有消息到达时被触发,message代表接收到的消息。
(3)void messageSent(IoSession session, Object message)
功能说明:发送消息时时被触发,即在调用IoSession.write()时被触发,message代表将要发送的消息。
(4)void sessionClosed(IoSession session)
功能说明:当连接关闭时被触发,即Session终止时被触发。
(5)void sessionCreated(IoSession session)
功能说明:当创建一个新连接时被触发,即当开始一个新的Session时被触发。
(6)void sessionIdle(IoSession session, IdleStatus status)
功能说明:当连接空闲时被触发。使用IoSessionConfig中的setIdleTime(IdleStatus status, int idleTime)方法可以设置session的空闲时间。如果该Session的空闲时间超过设置的值,该方法被触发,可以通过session.getIdleCount(status)来获取sessionIdle被触发的次数。
(7)void sessionOpened(IoSession session)
功能说明:当打开一个连接时被触发。在目前的实现中,好像 sessionOpened 和 sessionCreated 没有太大区别,sessionCreated 在 sessionOpened 之前被触发。

说明:IoHandler封装了来自客户端不同事件的处理,通过实现相应的方法,当该事件发生时,IoHandler中的方法就会被触发执行。

使用示例:

//创建服务对象
IoConnector mTcpClient = new NioSocketConnector();

//添加过滤器
mTcpClient.getFilterChain().addLast(过滤器);

//public class MinaTcpClientHandler extends IoHandlerAdapter
MinaTcpClientHandler handler = new MinaTcpClientHandler(objIDCardInfoList,mHandler);

//添加处理器,一般继承IoHandlerAdapter并重写messageReceived方法用于接收新消息
mTcpClient.setHandler(handler);

//创建链接
ConnectFuture future =mTcpClient.connect(new InetSocketAddress(路径, 端口号));

// 等待连接创建完成
future.awaitUninterruptibly();

//获得session
session = future.getSession();
//写数据
session.write(strData);

if(session != null){
        session.getCloseFuture().awaitUninterruptibly();//等待连接断开
}
mTcpClient.dispose();

//-------------UDP:过滤器的生成--------------------
mUdpClient.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("UTF-8"))));

public class MyProtocalCodecFactory implements ProtocolCodecFactory {  
    private final JsonMessageEncoder encoder;  
    private final JsonMessageDecoder decoder;  
        //getter and setter
}

public class JsonMessageDecoder extends CumulativeProtocolDecoder {
        @Override
        protected boolean doDecode(...){}
}
断线重连:

public class ClientReconnectTest {

    public static IoSession session = null;

    public static  NioSocketConnector connector = null;

    public static void main(String[] args) {
        connector = new NioSocketConnector();
        connector.setConnectTimeoutMillis(30000); // 设置连接超时
        connector.getSessionConfig().setReceiveBufferSize(10240); // 设置接收缓冲区的大小
        connector.getSessionConfig().setSendBufferSize(10240);// 设置输出缓冲区的大小
        connector.getFilterChain().addLast("logger", new LoggingFilter());
        connector.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new ByteArrayCodecFactory()));// 设置编码过滤器
        connector.setHandler(new ClientHandler());// 设置事件处理器
        connector.setDefaultRemoteAddress(new InetSocketAddress("127.0.0.1",
                8888));// 设置默认访问地址

        // 添加重连监听---实现自动重连
        connector.addListener(new IoListener() {
            @Override
            public void sessionDestroyed(IoSession arg0) throws Exception {
                //重连10次
                for (int i= 0 ;i <= 10; i++) {
                    try {
                        Thread.sleep(3000);
                        ConnectFuture future = connector.connect();
                        future.awaitUninterruptibly();// 等待连接创建成功
                        session = future.getSession();// 获取会话
                        if (session.isConnected()) {
                            System.out.println("断线重连["
                                    + connector.getDefaultRemoteAddress()
                                            .getHostName()
                                    + ":"
                                    + connector.getDefaultRemoteAddress()
                                            .getPort() + "]成功");
                            break;
                        }
                    } catch (Exception ex) {
                        System.out.println("重连服务器登录失败,3秒再连接一次:" + ex.getMessage());
                    }
                }
            }
        });
        //确保连接成功,连接n次,
        for (int i= 0 ;i <= 5; i++) {
            try {
                ConnectFuture future = connector.connect();
                future.awaitUninterruptibly(); // 等待连接创建成功
                session = future.getSession(); // 获取会话
                if(session.isConnected()){
                    System.out.println("连接服务端"
                            + connector.getDefaultRemoteAddress()
                            .getHostName()
                            + ":"
                            + connector.getDefaultRemoteAddress()
                            .getPort()
                            + "[成功]"
                            + ",,时间:"
                            + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                    .format(new Date()));
                    break;
                }
            } catch (RuntimeIoException e) {
                System.out.println(
                        "连接服务端"
                                + connector.getDefaultRemoteAddress()
                                .getHostName()
                                + ":"
                                + connector.getDefaultRemoteAddress()
                                .getPort()
                                + "失败"
                                + ",,时间:"
                                + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                        .format(new Date())
                                + ", 连接SOCKET服务异常,请检查SOCKET端口、IP是否正确,MSG服务是否启动,异常内容:"
                                + e.getMessage());
                try {
                    Thread.sleep(5000);// 连接失败后,重连间隔5s
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }

        BaseMessageForServer message = new BaseMessageForServer();
        String content = "hello world!";
        CRC32 crc = new CRC32();
        try {
            crc.update(content.getBytes("GBK"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        message.setFuncid(5);
        message.setPacketIdCode(10000);
        message.setContent(content);
        message.setCheckCode(crc.getValue());
        message.setLength(content.getBytes().length);
        session.write(message);
    }

}

---------------------
【转载,仅作分享,侵删】
作者:river66
原文:https://blog.csdn.net/river66/article/details/87715802
版权声明:本文为博主原创文章,转载请附上博文链接!

1 个回复

倒序浏览
奈斯,感谢分享
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马