黑马程序员技术交流社区

标题: [南京校区]RabbitMQ消息中间件常见问 [打印本页]

作者: 大蓝鲸小蟀锅    时间: 2019-6-17 16:00
标题: [南京校区]RabbitMQ消息中间件常见问
RabbitMQ (有几种消息模式自己去看,这里不做解释)1. 使用RabbitMQ的一些经验
1. consume时预取参数的大小对consume性能影响很大,具体可参见官方博客
2. 队列HA的代价非常高,特别是对带宽的占用,有限制的使用HA,且只提供两备份即可;
3. 磁盘也可能形成瓶颈,如果单台机器队列很多,确认只在必要时才使用duration,避免把磁盘跑满;
4. 队列的消息大量累积后,发送和消费速度都会受到影响,导致服务进一步恶化,采用的方法是,额外的脚本监控每个队列的消息数,超过限额会执行purge操作,简单粗暴但是有效的保证了服务稳定;
5. 限制单条消息大小,我们的限制是128k,消息队列只走消息,其他交由存储去做;
6. iptables适当的限制连接;
2. RabbitMQ队列超长导致QueueingConsumerJVM溢出
我们的服务器使用RabbitMQ作为消息中转的容器,一般RabbitMQ进程占用内存不过100M-200M,这些队列超长的RabbitMQ进程可以占用超过2G的内存。
显然消息队列的消费者出现了问题。开发查看日志发现作为该队列消费者的Java服务的日志也卡住了,重启服务后(这点做得不对,应该用jstatjstack进行排查,而不是直接重启)又很快卡住。
通过jstat发现JVM内存都耗尽了,之后进入无尽的Full GC,所以当然不会处理队列消息和输出日志信息了。
使用jmap导出这时候的Java堆栈,命令:jmap -dump:format=b,file=29389.hprof  29389。将得到的dump文件放到MATEclipse Memory Analyzer)里进行分析,发现很明显是QueueingConsumer持有大量对象导致JVM内存溢出
解决方案将basicQos设置为16后重启服务,队列终于开始消化了。用jstat 观察JVM内存使用情况,也没有再出现剧增溢出的现象。
      
总结:使用RabbitMQ的时候,一定要合理地设置QoS参数。RabbitMQ的默认做法其实是很脆弱的,容易导致雪崩。
3. RabbitMQ数据速率问题
在边读边写的情况下:速率只与网络带宽正相关,网络使用率最高能达到接近100%,并且数据使用率很高(90%以上)。
        在千兆网下,以500KB一条数据为例,读写速率均能达到200/s,约为100MB/s
在只写不读的情况下:写入速率瓶颈在于硬盘写入速度。
4. RabbitMQ数据存储路径变更到D盘方法
Windows环境下,在安装前设置环境变量:RABBITMQ_BASE=D:\RabbitMQ_Data
5. RabbitMQ磁盘写满重启后数据丢失问题
表现:磁盘写满后发送、读取程序均不能连接服务。
解决方法:将QueueExchange设置为Durable即不会发生数据丢失问题。
通过a.关闭服务;b.删除占位文件、erl_crash.dumpc.重启服务 三步操作后,磁盘会清理出10M左右空间,此时读取数据程序便可正常工作。
正确设计的架构,应确保RabbitMQ不会发生磁盘写满崩溃的情况。
6. RabbitMQ集群
在网络带宽占满的情况下,通过集群的方式解决吞吐量不足的问题需要多台效果才明显。
假设外设吞吐率为d/s,外设
RabbitMQ1发送的概率为r1
RabbitMQ2发送的概率为r2
RabbitMQ1需要向RabbitMQ2转发的概率为r3
RabbitMQ2需要向RabbitMQ1转发的概率为r3
那么RabbitMQ1进入的吞吐率为:(r1*d + r4*r2*d) /s 3d/4/s
RabbitMQ2进入的吞吐率为:(r2*d + r3*r1*d) /s 3d/4/s
这样的确比只使用一台RabbitMQ的吞吐率d/s要求低些。
NRabbitMQ的集群,每台的平均吞吐率为:(2N-1)d/(N*N) /sN=3时,平均吞吐率为5d/9/sN=4时,平均吞吐率为7d/16/s
解决方法:多台RabbitMQ服务器提供服务,在客户端以轮循方式访问服务,若1down掉则不使用此台的队列服务,服务器之间没有联系,这样NRabbitMQ的平均吞吐率为:1d/N /s。具体实现可以,专写一个用户收发RabbitMQ消息的jar/dll,在配置文件里填写RabbitMQ机器地址,使用轮循询问、收发的方式,提供给应用程序以黑盒方式调用。
下面提供了java版本的收发实现。
发送端sender.xml配置:
<!-- 处理器相关 -->
    <bean id="sender" class="demo.Sender">
        <property name="templates">
            <list>
                <ref bean="template1" />
            <!--     <ref bean="template2" /> -->
            </list>
        </property>
    </bean>
    <bean id="timeFlicker" class="demo.TimeFlicker">
        <property name="handlers">
            <list>
                <ref bean="sender" />
            </list>
        </property>
    </bean>
    <!-- 处理器相关 -->
    <!-- amqp配置 相关 -->
    <rabbit:connection-factory id="connectionFactory1"
        host="192.1.11.108" username="guest" password="guest" virtual-host="/" />
    <rabbit:connection-factory id="connectionFactory2"
        host="192.1.11.172" username="guest" password="guest" virtual-host="/" />
    <!-- amqp配置 相关 -->
    <!-- 发送相关 -->
    <rabbit:template id="template1" connection-factory="connectionFactory1"
        exchange="exchange" />
    <rabbit:template id="template2" connection-factory="connectionFactory2"
        exchange="exchange" />
    <!-- 发送相关 -->
                  说明:这里配置了两个RabbitMQ服务器,timeFlicker的目的是过一段时间把不能服务的RabbitMQ服务器重新添加到列表中,重试发送。
                  接收端receiver.xml配置:
<!-- amqp配置 相关 -->
    <rabbit:connection-factory id="connectionFactory1"
        host="192.1.11.108" username="guest" password="guest" virtual-host="/" />
    <rabbit:connection-factory id="connectionFactory2"
        host="192.1.11.172" username="guest" password="guest" virtual-host="/" />
    <!-- amqp配置 相关 -->
    <!-- 监听相关 -->
    <bean id="Recv1" class="demo.Recv1" />
    <rabbit:listener-container id="Listener1"
        connection-factory="connectionFactory1" prefetch="1" acknowledge="auto">
        <rabbit:listener ref="Recv1" method="listen"
            queue-names="queue1" />
    </rabbit:listener-container>
    <bean id="Recv2" class="demo.Recv2" />
    <rabbit:listener-container id="Listener"
        connection-factory="connectionFactory1" prefetch="1" acknowledge="auto">
        <rabbit:listener ref="Recv2" method="listen"
            queue-names="queue2" />
    </rabbit:listener-container>
    <!-- 监听相关 -->
MQ丢数据的问题,
主要是同步还是异步刷盘、断电是否导致的。只要send反馈正确,确保发送被接收,receive时有反馈后才会删除数据;同步刷盘,或异步刷盘不断电的,就不会丢失消息,
程序对于发送反馈异常的,要记录;MQ对于receive无反馈的,有重发机制,可能会有一条数据发送多次的情况,要在程序中剔除。
7. RabbitMQ 常用命令
1. centos安装epel yum 源   
# rpm -ivh  
http://dl.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm
2. 安装erlang运行环境   
# yum install erlang
3. 安装rabbitmq server
# rpm --import
http://www.rabbitmq.com/rabbitmq-signing-key-public.asc  
# rpm -ivh  http://www.rabbitmq.com/releases ... -3.0.0-1.noarch.rpm
5. 打开server  
  # chkconfig rabbitmq-server on  
# rabbitmqctl status
会报异常:  
# rabbitmqctl status  
Status of node rabbit@devnote ...  
Error: unable to connect to node rabbit@devnote: nodedown   
DIAGNOSTICS
===========   
nodes in question: [rabbit@devnote]   
hosts, their running nodes and ports:
- devnote: [{rabbitmqctl24923,51045}]  
current node details:  
- node name: rabbitmqctl24923@devnote
- home dir: /var/lib/rabbitmq  
- cookie hash: TblHThacrBHJzl5Vt7Y4Ww==
执行命令:  
# /sbin/service rabbitmq-server stop
# /sbin/service rabbitmq-server start
# rabbitmqctl status 测试正确   
查看所有队列信息  # rabbitmqctl list_queues
关闭应用  # rabbitmqctl stop_app  
启动应用,和上述关闭命令配合使用,达到清空队列的目的
# rabbitmqctl start_app
清除所有队列  
# rabbitmqctl reset  
更多用法及参数,可以执行如下命令查看
# rabbitmqctl  
1)首先关闭rabbitmq: rabbitmqctl stop_app
(2)还原: rabbitmqctl reset
3)启动: rabbitmqctl start_app  
(3)添加用户: rabbitmqctl add_user root root  
(4)设置权限:rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
(6)查看用户: rabbitmqctl list_users
8. RabbitMQ(消息中间件)在工作中的应用场景有如下几种:
  1、跨系统的异步通信,所有需要异步交互的地方都可以使用消息队列。就像我们除了打电话(同步)以外,还需要发短信,发电子邮件(异步)的通讯方式。
  2、多个应用之间的耦合,由于消息是平台无关和语言无关的,而且语义上也不再是函数调用,因此更适合作为多个应用之间的松耦合的接口。基于消息队列的耦合,不需要发送方和接收方同时在线。在企业应用集成(EAI)中,文件传输,共享数据库,消息队列,远程过程调用都可以作为集成的方法。
  3、应用内的同步变异步,比如订单处理,就可以由前端应用将订单信息放到队列,后端应用从队列里依次获得消息处理,高峰时的大量订单可以积压在队列里慢慢处理掉。由于同步通常意味着阻塞,而大量线程的阻塞会降低计算机的性能。
  4、消息驱动的架构(EDA),系统分解为消息队列,和消息制造者和消息消费者,一个处理流程可以根据需要拆成多个阶段(Stage),阶段之间用队列连接起来,前一个阶段处理的结果放入队列,后一个阶段从队列中获取消息继续处理。
  5、应用需要更灵活的耦合方式,如发布订阅,比如可以指定路由规则。
  6、跨局域网,甚至跨城市的通讯(CDN行业),比如北京机房与广州机房的应用程序的通信。
  这里还有一种情况,同时有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题,例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能
9. RabbitMQ解释
1.Broker
简单来说就是消息队列服务器的实体。
2.Exchange
接收消息,转发消息到绑定的队列上,指定消息按什么规则,路由到哪个队列。
3.Queue
消息队列载体,用来存储消息,相同属性的 queue 可以重复定义,每个消息都会被投入到一个或多个队列。
4.Binding
绑定,它的作用就是把 Exchange Queue 按照路由规则绑定起来。
5.RoutingKey
路由关键字,Exchange 根据这个关键字进行消息投递。
6.Producter
消息生产者,产生消息的程序。
7.Consumer
消息消费者,接收消息的程序。
8.Channel
消息通道,在客户端的每个连接里可建立多个 Channel,每个 channel 代表一个会话。






欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2