黑马程序员技术交流社区

标题: 【上海校区】系统间通信(1)——概述从“聊天”开始上篇 [打印本页]

作者: wuqiong    时间: 2018-6-12 11:39
标题: 【上海校区】系统间通信(1)——概述从“聊天”开始上篇

从这篇博文开始,我们将进入一个新文章系列。这个文章系列专门整理总结了目前系统间通信的主要原理、手段和实现。我们将讲解典型的信息格式、讲解传统的RMI调用并延伸出来重点讲解RPC调用和使用案例;最后我们还会讲到SOA架构的实现,包括ESB实现和服务注册/治理的实现,同样包括原理、实现和使用案例。

系统间通信是架构师需要掌握的又一个关键技术领域,如果说理解和掌握负载均衡层技术需要您有一定的linux系统知识和操作系统知识的话,那么理解和掌握系统间通信层技术,需要您有一定的编程经验(最好是JAVA编程经验,因为我们会主要以JAVA技术作为实例演示)。

1、一个场景

首先我们来看一个显示场景:在现实生活中有两个人技术人员A和B,在进行一问一答形式的交流。如下图所示:

我们来看这幅图的中的几个要点:

2、信息格式

很明显通过中文的交谈,两个人相互明白了对方的意图。为了保证信息传递的高效性,我们一定会将信息做成某种参与者都理解的格式。例如:中文有其特定的语法结构,例如主谓宾,定状补。

在计算机领域为了保证信息能够被处理,信息也会被做成特定的格式,而且要确保目标能够明白这种格式。常用的信息格式包括:

这里有一篇介绍TLV的文章:《通信协议之序列化TLV》,TLV格式所携带的内容是最有效的,它就连JSON中用于分割层次的“{}”符号都没有。

在这个系列的博文中,我们不会把信息格式作为一个重点,但是会花一些篇幅去比较各种信息格式在网络上传输的速度、性能,并为大家介绍几种典型的信息格式选型场景。

3、网络协议

如文中第一张图描述的场景,有一个我们看不到但是却很重要的元素:空气。声音在空气中完成传播,真空无法传播声音。同样信息是在网络中完成传播的,没有网络就没法传播信息。网络协议就是计算机领域的“空气”,下图中我们以OSI模型作为参考:

在这个系列的博文中,我们不会把网络协议作为一个重点。这是因为网络网络协议的知识是一个相对独立的的知识领域,十几篇文章都不一定讲得清楚。如果您对网络协议有兴趣,这里推荐两本书:《TCP/IP详解.卷1-协议》和《TCP/IP详解.卷2-实现》。

4、通信方式/框架

在文章最前面我们看到其中一个人规定了一种沟通方式:“你必须把我说的话听完,然后给我反馈后。我才会问第二个问题”。这种沟通方式虽然沟通效率不高,但是很有效:一个问题一个问题的处理。

但是如果参与沟通的人处理信息的能力比较强,那么他们还可以采用另一种沟通方式:“我给我提的问题编了一个号,在问完第X个问题后,我不会等待你返回,就会问第X+1个问题,同样你在听完我第X个问题后,一边处理我的问题,一边听我第X+1个问题。”

实际上以上两种现实中的沟通方式,在计算机领域是可以找到对应的通信方式的,这就是我们这个系列的博文会着重讲的BIO(阻塞模式)通信和NIO(非阻塞模式)。

4-1、BIO通信方式

以前大多数网络通信方式都是阻塞模式的,即:

如下图所示:

传统的BIO通信方式存在几个问题:

上面说的情况是服务器只有一个线程的情况,那么读者会直接提出我们可以使用多线程技术来解决这个问题:

如下图所示:

但是使用线程来解决这个问题实际上是有局限性的:

4-2、BIO通信方式深入分析

在这个系列的博文中,通信方式/框架将作为一个重点进行讲解。包括NIO的原理,并通过讲解Netty的使用、JAVA原生NIO框架的使用,去熟悉这些核心原理。

实际上从上文中我们可以看出,BIO的问题关键不在于是否使用了多线程(包括线程池)处理这次请求,而在于accept()、read()的操作点都是被阻塞。要测试这个问题,也很简单。我们模拟了20个客户端(用20根线程模拟),利用JAVA的同步计数器CountDownLatch,保证这20个客户都初始化完成后然后同时向服务器发送请求,然后我们来观察一下Server这边接受信息的情况。

4-2-1、模拟20个客户端并发请求,服务器端使用单线程:
客户端代码(SocketClientDaemon)
package testBSocket;

import java.util.concurrent.CountDownLatch;

public class SocketClientDaemon {
    public static void main(String[] args) throws Exception {
        Integer clientNumber = 20;
        CountDownLatch countDownLatch = new CountDownLatch(clientNumber);

        //分别开始启动这20个客户端
        for(int index = 0 ; index < clientNumber ; index++ , countDownLatch.countDown()) {
            SocketClientRequestThread client = new SocketClientRequestThread(countDownLatch, index);
            new Thread(client).start();
        }

        //这个wait不涉及到具体的实验逻辑,只是为了保证守护线程在启动所有线程后,进入等待状态
        synchronized (SocketClientDaemon.class) {
            SocketClientDaemon.class.wait();
        }
    }
}

客户端代码(SocketClientRequestThread模拟请求)
package testBSocket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

/**
* 一个SocketClientRequestThread线程模拟一个客户端请求。
* @author yinwenjie
*/
public class SocketClientRequestThread implements Runnable {

    static {
        BasicConfigurator.configure();
    }

    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(SocketClientRequestThread.class);

    private CountDownLatch countDownLatch;

    /**
     * 这个线层的编号
     * @param countDownLatch
     */
    private Integer clientIndex;

    /**
     * countDownLatch是java提供的同步计数器。
     * 当计数器数值减为0时,所有受其影响而等待的线程将会被激活。这样保证模拟并发请求的真实性
     * @param countDownLatch
     */
    public SocketClientRequestThread(CountDownLatch countDownLatch , Integer clientIndex) {
        this.countDownLatch = countDownLatch;
        this.clientIndex = clientIndex;
    }

    @Override
    public void run() {
        Socket socket = null;
        OutputStream clientRequest = null;
        InputStream clientResponse = null;

        try {
            socket = new Socket("localhost",83);
            clientRequest = socket.getOutputStream();
            clientResponse = socket.getInputStream();

            //等待,直到SocketClientDaemon完成所有线程的启动,然后所有线程一起发送请求
            this.countDownLatch.await();

            //发送请求信息
            clientRequest.write(("这是第" + this.clientIndex + " 个客户端的请求。").getBytes());
            clientRequest.flush();

            //在这里等待,直到服务器返回信息
            SocketClientRequestThread.LOGGER.info("第" + this.clientIndex + "个客户端的请求发送完成,等待服务器返回信息");
            int maxLen = 1024;
            byte[] contextBytes = new byte[maxLen];
            int realLen;
            String message = "";
            //程序执行到这里,会一直等待服务器返回信息(注意,前提是in和out都不能close,如果close了就收不到服务器的反馈了)
            while((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) {
                message += new String(contextBytes , 0 , realLen);
            }
            SocketClientRequestThread.LOGGER.info("接收到来自服务器的信息:" + message);
        } catch (Exception e) {
            SocketClientRequestThread.LOGGER.error(e.getMessage(), e);
        } finally {
            try {
                if(clientRequest != null) {
                    clientRequest.close();
                }
                if(clientResponse != null) {
                    clientResponse.close();
                }
            } catch (IOException e) {
                SocketClientRequestThread.LOGGER.error(e.getMessage(), e);
            }
        }
    }
}

服务器端(SocketServer1)单个线程
package testBSocket;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class SocketServer1 {

    static {
        BasicConfigurator.configure();
    }

    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(SocketServer1.class);

    public static void main(String[] args) throws Exception{
        ServerSocket serverSocket = new ServerSocket(83);

        try {
            while(true) {
                Socket socket = serverSocket.accept();

                //下面我们收取信息
                InputStream in = socket.getInputStream();
                OutputStream out = socket.getOutputStream();
                Integer sourcePort = socket.getPort();
                int maxLen = 2048;
                byte[] contextBytes = new byte[maxLen];
                //这里也会被阻塞,直到有数据准备好
                int realLen = in.read(contextBytes, 0, maxLen);
                //读取信息
                String message = new String(contextBytes , 0 , realLen);

                //下面打印信息
                SocketServer1.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message);

                //下面开始发送信息
                out.write("回发响应信息!".getBytes());

                //关闭
                out.close();
                in.close();
                socket.close();
            }
        } catch(Exception e) {
            SocketServer1.LOGGER.error(e.getMessage(), e);
        } finally {
            if(serverSocket != null) {
                serverSocket.close();
            }
        }
    }
}

4-2-2、就像上文所述我们可以使用多线程来优化服务器端的处理过程:

客户端代码和上文一样,最主要是更改服务器端的代码:

package testBSocket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class SocketServer2 {

    static {
        BasicConfigurator.configure();
    }

    private static final Log LOGGER = LogFactory.getLog(SocketServer2.class);

    public static void main(String[] args) throws Exception{
        ServerSocket serverSocket = new ServerSocket(83);

        try {
            while(true) {
                Socket socket = serverSocket.accept();
                //当然业务处理过程可以交给一个线程(这里可以使用线程池),并且线程的创建是很耗资源的。
                //最终改变不了.accept()只能一个一个接受socket的情况,并且被阻塞的情况
                SocketServerThread socketServerThread = new SocketServerThread(socket);
                new Thread(socketServerThread).start();
            }
        } catch(Exception e) {
            SocketServer2.LOGGER.error(e.getMessage(), e);
        } finally {
            if(serverSocket != null) {
                serverSocket.close();
            }
        }
    }
}

/**
* 当然,接收到客户端的socket后,业务的处理过程可以交给一个线程来做。
* 但还是改变不了socket被一个一个的做accept()的情况。
* @author yinwenjie
*/
class SocketServerThread implements Runnable {

    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(SocketServerThread.class);

    private Socket socket;

    public SocketServerThread (Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        InputStream in = null;
        OutputStream out = null;
        try {
            //下面我们收取信息
            in = socket.getInputStream();
            out = socket.getOutputStream();
            Integer sourcePort = socket.getPort();
            int maxLen = 1024;
            byte[] contextBytes = new byte[maxLen];
            //使用线程,同样无法解决read方法的阻塞问题,
            //也就是说read方法处同样会被阻塞,直到操作系统有数据准备好
            int realLen = in.read(contextBytes, 0, maxLen);
            //读取信息
            String message = new String(contextBytes , 0 , realLen);

            //下面打印信息
            SocketServerThread.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message);

            //下面开始发送信息
            out.write("回发响应信息!".getBytes());
        } catch(Exception e) {
            SocketServerThread.LOGGER.error(e.getMessage(), e);
        } finally {
            //试图关闭
            try {
                if(in != null) {
                    in.close();
                }
                if(out != null) {
                    out.close();
                }
                if(this.socket != null) {
                    this.socket.close();
                }
            } catch (IOException e) {
                SocketServerThread.LOGGER.error(e.getMessage(), e);
            }
        }
    }
}

4-2-3、看看服务器端的执行效果:

我相信服务器使用单线程的效果就不用看了,我们主要看一看服务器使用多线程处理时的情况:


4-2-4、问题根源

那么重点的问题并不是“是否使用了多线程”,而是为什么accept()、read()方法会被阻塞。即:异步IO模式 就是为了解决这样的并发性存在的。但是为了说清楚异步IO模式,在介绍IO模式的时候,我们就要首先了解清楚,什么是 阻塞式同步、非阻塞式同步、多路复用同步模式。

这里我要特别说明一下,在一篇网文《Java NIO与IO的详细区别(通俗篇)》中,作者主要讲到了自己对非阻塞方式下硬盘操作的理解。按照我的看法,只要有IO的存在,就会有阻塞或非阻塞的问题,无论这个IO是网络的,还是硬盘的。这就是为什么基本的JAVA NIO框架中会有FileChannel(而且FileChannel在操作系统级别是不支持非阻塞模式的)、DatagramChannel和SocketChannel的原因。NIO并不只是为了解决磁盘读写的性能而存在的,它的出现原因、要解决的问题更为广阔;但是另外一个方面,文章作者只是表达自己的思想,没有必要争论得“咬文嚼字”。

API文档中对于 serverSocket.accept() 方法的使用描述:

Listens for a connection to be made to this socket and accepts it. The method blocks until a connection is made.

那么我们首先来看看为什么serverSocket.accept()会被阻塞。这里涉及到阻塞式同步IO的工作原理:

===================================
(内容太多,分上下两篇文章发布)









欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2