A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

1、全文提要

系统间通信本来是一个很大的概念,我们首先重通信模型开始讲解。在理解了四种通信模型的工作特点和区别后,对于我们后文介绍搭建在其上的各种通信框架,集成思想都是有益的。

目前常用的IO通信模型包括四种(这里说的都是网络IO):阻塞式同步IO、非阻塞式同步IO、多路复用IO、和真正的异步IO。这些IO模式都是要靠操作系统进行支持,应用程序只是提供相应的实现,对操作系统进行调用。

上篇中,首先介绍传统的阻塞式同步IO和非阻塞式同步IO两种IO工作模式,然后使用JAVA进行实现;下篇,对多路复用IO工作模式和异步IO工作模式进行介绍,并介绍java对这两种工作模式的支持。

2、传统阻塞模式(BIO)

这个小节的介绍,在《架构设计:系统间通信(1)——概述从“聊天”开始上篇》这篇文章中已经说明了,这里只是“接着讲”,您可以理解成“在概述的基础上继续深入写”。BIO就是:blocking IO。最容易理解、最容易实现的IO工作方式,应用程序向操作系统请求网络IO操作,这时应用程序会一直等待;另一方面,操作系统收到请求后,也会等待,直到网络上有数据传到监听端口;操作系统在收集数据后,会把数据发送给应用程序;最后应用程序受到数据,并解除等待状态。如下图所示:

(请您注意,上图中交互的两个元素是应用程序和它所使用的操作系统)就TCP协议来说,整个过程实际上分成三个步骤:三次握手建立连接、传输数据(包括验证和重发)、断开连接。当然,断开连接的过程并不在我们讨论的IO的主要过程中。但是我们讨论IO模型,应该把建立连接和传输数据的者两个过程分开讨论。

2-1、JAVA对阻塞模式的支持

JAVA对阻塞模式的支持,就是java.net包中的Socket套接字实现。这里要说明一下,Socket套接字是TCP/UDP等传输层协议的实现。例如客户端使用TCP协议连接这台服务器的时候,当TCP三次握手成功后,应用程序就会创建一个socket套接字对象(注意,这是还没有进行数据内容的传输),当这个TCP连接出现数据传输时,socket套接字就会把数据传输的表现告诉程序员(例如read方法接触阻塞状态)


下面这段代码是java对阻塞模式的支持:


package testBSocket;


import java.io.InputStream;

import java.io.OutputStream;

import java.net.ServerSocket;

import java.net.Socket;


import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.log4j.BasicConfigurator;


public class SocketServer1 {


    static {

        BasicConfigurator.configure();

    }


    /**

     * 日志

     */

    private static final Log LOGGER = LogFactory.getLog(SocketServer1.class);


    public static void main(String[] args) throws Exception{

        ServerSocket serverSocket = new ServerSocket(83);


        try {

            while(true) {

                //这里JAVA通过JNI请求操作系统,并一直等待操作系统返回结果(或者出错)

                Socket socket = serverSocket.accept();


                //下面我们收取信息(这里还是阻塞式的,一直等待,直到有数据可以接受)

                InputStream in = socket.getInputStream();

                OutputStream out = socket.getOutputStream();

                Integer sourcePort = socket.getPort();

                int maxLen = 2048;

                byte[] contextBytes = new byte[maxLen];

                int realLen;

                StringBuffer message = new StringBuffer();

                //read的时候,程序也会被阻塞,直到操作系统把网络传来的数据准备好。

                while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {

                    message.append(new String(contextBytes , 0 , realLen));

                    /*

                     * 我们假设读取到“over”关键字,

                     * 表示客户端的所有信息在经过若干次传送后,完成

                     * */

                    if(message.indexOf("over") != -1) {

                        break;

                    }

                }

                //下面打印信息

                SocketServer1.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message);


                //下面开始发送信息

                out.write("回发响应信息!".getBytes());


                //关闭

                out.close();

                in.close();

                socket.close();

            }

        } catch(Exception e) {

            SocketServer1.LOGGER.error(e.getMessage(), e);

        } finally {

            if(serverSocket != null) {

                serverSocket.close();

            }

        }

    }

}

上面的服务器端代码可以直接运行。代码执行到serverSocket.accept()的位置就会等待,这个调用的含义是应用程序向操作系统请求客户端连接的接收,这是代码会阻塞,而底层调用的位置在DualStackPlainSocketImpl这个类里面(注意我使用的测试环境是windows 8 ,所以是由这个类处理;如果您是在windows 7环境下进行测试,那么处理类是TwoStacksPlainSocketImpl,这是Windows环境;如果您使用的测试环境是Linux,那么视Linux的内核版本而异,具体的处理类又是不一样的)。


2-2、存在的问题

很明显,我们在代码里面并没有设置timeout属性,所以运行的是“if”这段的代码,很明显在调用JNI后,下层也在等待有客户端连接上来。这种调用方式当然有问题:

  • 同一时间,服务器只能接受来自于客户端A的请求信息;虽然客户端A和客户端B的请求是同时进行的,但客户端B发送的请求信息只能等到服务器接受完A的请求数据后,才能被接受。

  • 由于服务器一次只能处理一个客户端请求,当处理完成并返回后(或者异常时),才能进行第二次请求的处理。很显然,这样的处理方式在高并发的情况下,是不能采用的。

  • 实际上以上的问题是可以通过多线程来解决的,实际上就是当accept接收到一个客户端的连接后,服务器端启动一个新的线程,来读写客户端的数据,并完成相应的业务处理。但是你无法影响操作系统底层的“同步IO”机制。


3、非阻塞模式

一定要注意:阻塞/非阻塞的描述是针对应用程序中的线程进行的,对于阻塞方式的一种改进是应用程序将其“一直等待”的状态主动打开,如下图所示:

这种模式下,应用程序的线程不再一直等待操作系统的IO状态,而是在等待一段时间后,就解除阻塞。如果没有得到想要的结果,则再次进行相同的操作。这样的工作方式,暴增了应用程序的线程可以不会一直阻塞,而是可以进行一些其他工作。

3-1、JAVA对非阻塞模式的支持

那么JAVA中是否支持这种非阻塞IO的工作模式呢?我们继续分析DualStackPlainSocketImpl中的accept0实现:

那么timeout是在哪里设置的呢?在ServerSocket中,调用了DualStackPlainSocketImpl的父类SocketImpl进行timeout的设置:

ServerSocket中的setSoTimeout方法也有相应的注释说明:

Enable/disable SO_TIMEOUT with the specified timeout, in milliseconds. With this option set to a non-zero timeout, a call to accept() for this ServerSocket will block for only this amount of time. If the timeout expires, a java.net.SocketTimeoutException is raised, though the ServerSocket is still valid. The option must be enabled prior to entering the blocking operation to have effect. The timeout must be > 0. A timeout of zero is interpreted as an infinite timeout.

那么java中对非阻塞IO的支持如下:


package testBSocket;


import java.io.IOException;

import java.io.InputStream;

import java.io.OutputStream;

import java.net.ServerSocket;

import java.net.Socket;

import java.net.SocketTimeoutException;


import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.log4j.BasicConfigurator;


public class SocketServer2 {


    static {

        BasicConfigurator.configure();

    }


    private static Object xWait = new Object();


    /**

     * 日志

     */

    private static final Log LOGGER = LogFactory.getLog(SocketServer2.class);


    public static void main(String[] args) throws IOException {

        ServerSocket serverSocket = null;


        try {

            serverSocket = new ServerSocket(83);

            serverSocket.setSoTimeout(100);

            while(true) {

                Socket socket = null;

                try {

                    socket = serverSocket.accept();

                } catch(SocketTimeoutException e1) {

                    //===========================================================

                    //      执行到这里,说明本次accept没有接收到任何数据报文

                    //      主线程在这里就可以做一些事情,记为X

                    //===========================================================

                    synchronized (SocketServer2.xWait) {

                        SocketServer2.LOGGER.info("这次没有从底层接收到任务数据报文,等待10毫秒,模拟事件X的处理时间");

                        SocketServer2.xWait.wait(10);

                    }

                    continue;

                }


                InputStream in = socket.getInputStream();

                OutputStream out = socket.getOutputStream();

                Integer sourcePort = socket.getPort();

                int maxLen = 2048;

                byte[] contextBytes = new byte[maxLen];

                int realLen;

                StringBuffer message = new StringBuffer();

                //下面我们收取信息(这里还是阻塞式的,一直等待,直到有数据可以接受)

                while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {

                    message.append(new String(contextBytes , 0 , realLen));

                    /*

                     * 我们假设读取到“over”关键字,

                     * 表示客户端的所有信息在经过若干次传送后,完成

                     * */

                    if(message.indexOf("over") != -1) {

                        break;

                    }

                }

                //下面打印信息

                SocketServer2.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message);


                //下面开始发送信息

                out.write("回发响应信息!".getBytes());


                //关闭

                out.close();

                in.close();

                socket.close();

            }

        } catch(Exception e) {

            SocketServer2.LOGGER.error(e.getMessage(), e);

        } finally {

            if(serverSocket != null) {

                serverSocket.close();

            }

        }

    }

}


执行效果如下:


这里我们针对了SocketServer增加了阻塞等待时间,实际上只实现了非阻塞IO模型中的第一步:监听连接状态的非阻塞。通过运行代码,我们可以发现read()方法还是被阻塞的,说明socket套接字等待数据读取的过程,还是阻塞方式。

3-2、继续改进

那么,我们能不能改进read()方式,让它也变成非阻塞模式呢?当然是可以的,socket套接字同样支持等待超时时间设置。代码如下:


package testBSocket;


import java.io.IOException;

import java.io.InputStream;

import java.io.OutputStream;

import java.net.ServerSocket;

import java.net.Socket;

import java.net.SocketTimeoutException;


import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.log4j.BasicConfigurator;


public class SocketServer3 {


    static {

        BasicConfigurator.configure();

    }


    private static Object xWait = new Object();


    /**

     * 日志

     */

    private static final Log LOGGER = LogFactory.getLog(SocketServer3.class);


    public static void main(String[] args) throws IOException {

        ServerSocket serverSocket = null;


        try {

            serverSocket = new ServerSocket(83);

            serverSocket.setSoTimeout(100);

            while(true) {

                Socket socket = null;

                try {

                    socket = serverSocket.accept();

                } catch(SocketTimeoutException e1) {

                    //===========================================================

                    //      执行到这里,说明本次accept没有接收到任何TCP连接

                    //      主线程在这里就可以做一些事情,记为X

                    //===========================================================

                    synchronized (SocketServer3.xWait) {

                        SocketServer3.LOGGER.info("这次没有从底层接收到任何TCP连接,等待10毫秒,模拟事件X的处理时间");

                        SocketServer3.xWait.wait(10);

                    }

                    continue;

                }


                InputStream in = socket.getInputStream();

                OutputStream out = socket.getOutputStream();

                Integer sourcePort = socket.getPort();

                int maxLen = 2048;

                byte[] contextBytes = new byte[maxLen];

                int realLen;

                StringBuffer message = new StringBuffer();

                //下面我们收取信息(设置成非阻塞方式,这样read信息的时候,又可以做一些其他事情)

                socket.setSoTimeout(10);

                BIORead:while(true) {

                    try {

                        while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {

                            message.append(new String(contextBytes , 0 , realLen));

                            /*

                             * 我们假设读取到“over”关键字,

                             * 表示客户端的所有信息在经过若干次传送后,完成

                             * */

                            if(message.indexOf("over") != -1) {

                                break BIORead;

                            }

                        }

                    } catch(SocketTimeoutException e2) {

                        //===========================================================

                        //      执行到这里,说明本次read没有接收到任何数据流

                        //      主线程在这里又可以做一些事情,记为Y

                        //===========================================================

                        SocketServer3.LOGGER.info("这次没有从底层接收到任务数据报文,等待10毫秒,模拟事件Y的处理时间");

                        continue;

                    }

                }

                //下面打印信息

                SocketServer3.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message);


                //下面开始发送信息

                out.write("回发响应信息!".getBytes());


                //关闭

                out.close();

                in.close();

                socket.close();

            }

        } catch(Exception e) {

            SocketServer3.LOGGER.error(e.getMessage(), e);

        } finally {

            if(serverSocket != null) {

                serverSocket.close();

            }

        }

    }

}


这样一来,我们利用JAVA实现了完整的“非阻塞IO”模型:让TCP连接和数据读取这两个过程,都变成了“非阻塞”方式了。


然并卵,这种处理方式实际上并没有解决accept方法、read方法阻塞的根本问题。根据上文的叙述,accept方法、read方法阻塞的根本问题是底层接受数据报文时的“同步IO”工作方式。这两次改进过程,只是解决了IO操作的两步中的第一步:将程序层面的阻塞方式变成了非阻塞方式。


3-3、利用线程再改进


另一个方面,由于应用程序级别,我们并没有使用多线程技术,这就导致了应用程序只能一个socket套接字 一个socket套接字的处理。这个socket套接字没有处理完,就没法处理下一个socket套接字。针对这个问题我们还是可以进行改进的:让应用程序层面上,各个socket套接字的处理不相互影响:


package testBSocket;


import java.io.InputStream;

import java.io.OutputStream;

import java.net.ServerSocket;

import java.net.Socket;

import java.net.SocketTimeoutException;


import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.log4j.BasicConfigurator;


/**

* 通过加入线程的概念,让socket server能够在应用层面,

* 通过非阻塞的方式同时处理多个socket套接字

* @author yinwenjie

*/

public class SocketServer4 {


    static {

        BasicConfigurator.configure();

    }


    private static Object xWait = new Object();


    private static final Log LOGGER = LogFactory.getLog(SocketServer4.class);


    public static void main(String[] args) throws Exception{

        ServerSocket serverSocket = new ServerSocket(83);

        serverSocket.setSoTimeout(100);

        try {

            while(true) {

                Socket socket = null;

                try {

                    socket = serverSocket.accept();

                } catch(SocketTimeoutException e1) {

                    //===========================================================

                    //      执行到这里,说明本次accept没有接收到任何TCP连接

                    //      主线程在这里就可以做一些事情,记为X

                    //===========================================================

                    synchronized (SocketServer4.xWait) {

                        SocketServer4.LOGGER.info("这次没有从底层接收到任何TCP连接,等待10毫秒,模拟事件X的处理时间");

                        SocketServer4.xWait.wait(10);

                    }

                    continue;

                }

                //当然业务处理过程可以交给一个线程(这里可以使用线程池),并且线程的创建是很耗资源的。

                //最终改变不了.accept()只能一个一个接受socket连接的情况

                SocketServerThread socketServerThread = new SocketServerThread(socket);

                new Thread(socketServerThread).start();

            }

        } catch(Exception e) {

            SocketServer4.LOGGER.error(e.getMessage(), e);

        } finally {

            if(serverSocket != null) {

                serverSocket.close();

            }

        }

    }

}


/**

* 当然,接收到客户端的socket后,业务的处理过程可以交给一个线程来做。

* 但还是改变不了socket被一个一个的做accept()的情况。

* @author yinwenjie

*/

class SocketServerThread implements Runnable {


    /**

     * 日志

     */

    private static final Log LOGGER = LogFactory.getLog(SocketServerThread.class);


    private Socket socket;


    public SocketServerThread (Socket socket) {

        this.socket = socket;

    }


    @Override

    public void run() {

        InputStream in = null;

        OutputStream out = null;

        try {

            in = socket.getInputStream();

            out = socket.getOutputStream();

            Integer sourcePort = socket.getPort();

            int maxLen = 2048;

            byte[] contextBytes = new byte[maxLen];

            int realLen;

            StringBuffer message = new StringBuffer();

            //下面我们收取信息(设置成非阻塞方式,这样read信息的时候,又可以做一些其他事情)

            this.socket.setSoTimeout(10);

            BIORead:while(true) {

                try {

                    while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {

                        message.append(new String(contextBytes , 0 , realLen));

                        /*

                         * 我们假设读取到“over”关键字,

                         * 表示客户端的所有信息在经过若干次传送后,完成

                         * */

                        if(message.indexOf("over") != -1) {

                            break BIORead;

                        }

                    }

                } catch(SocketTimeoutException e2) {

                    //===========================================================

                    //      执行到这里,说明本次read没有接收到任何数据流

                    //      主线程在这里又可以做一些事情,记为Y

                    //===========================================================

                    SocketServerThread.LOGGER.info("这次没有从底层接收到任务数据报文,等待10毫秒,模拟事件Y的处理时间");

                    continue;

                }

            }

            //下面打印信息

            Long threadId = Thread.currentThread().getId();

            SocketServerThread.LOGGER.info("服务器(线程:" + threadId + ")收到来自于端口:" + sourcePort + "的信息:" + message);


            //下面开始发送信息

            out.write("回发响应信息!".getBytes());


            //关闭

            out.close();

            in.close();

            this.socket.close();

        } catch(Exception e) {

            SocketServerThread.LOGGER.error(e.getMessage(), e);

        }

    }

}


3-4、依然存在的问题


引入了多线程技术后,IO的处理吞吐量大大提高了,但是这样做就真的没有问题了吗,您要知道操作系统可是有“最大线程”限制的:


虽然在服务器端,请求的处理交给了一个独立线程进行,但是操作系统通知accept()的方式还是单个处理的(甚至都不是非阻塞模式)。也就是说,实际上是服务器接收到数据报文后的“业务处理过程”可以多线程(包括可以是非阻塞模式),但是数据报文的接受还是需要一个一个的来。


在linux系统中,可以创建的线程是有限的。我们可以通过cat /proc/sys/kernel/threads-max 命令查看可以创建的最大线程数。当然这个值是可以更改的,但是线程越多,CPU切换所需的时间也就越长,用来处理真正业务的需求也就越少。


创建一个线程是有较大的资源消耗的。JVM创建一个线程的时候,即使这个线程不做任何的工作,JVM都会分配一个堆栈空间。这个空间的大小默认为128K,您可以通过-Xss参数进行调整。


当然您还可以使用ThreadPoolExecutor线程池来缓解线程的创建问题,但是又会造成BlockingQueue积压任务的持续增加,同样消耗了大量资源。另外,如果您的应用程序大量使用长连接的话,线程是不会关闭的。这样系统资源的消耗更容易失控。


最后,无论您是使用的多线程、还是加入了非阻塞模式,这都是在应用程序层面的处理,而底层socketServer所匹配的操作系统的IO模型始终是“同步IO”,最根本的问题并没有解决。


那么,如果你真想单纯使用线程来解决问题,那么您自己都可以计算出来您一个服务器节点可以一次接受多大的并发了。看来,单纯使用线程解决这个问题不是最好的办法。


4、多路复用IO(IO Multiplex)


我将详细讲解操作系统支持的多路复用IO的工作方式,并介绍JAVA 1.4版本中加入的 JAVA NIO对多路复用IO的实现。(东西太多,我们放下下篇中)


5、异步IO(真正的NIO)


我将详细讲解操作系统支持的异步IO方式,并介绍JAVA 1.7版本中加入的NIO2.0(AIO)对异步IO的实现。(东西太多,我们放下下篇中)


0 个回复

您需要登录后才可以回帖 登录 | 加入黑马