RabbitMQ (有几种消息模式自己去看,这里不做解释)1. 使用RabbitMQ的一些经验1. consume 时预取参数的大小对consume性能影响很大,具体可参见官方博客; 2. 队列HA的代价非常高,特别是对带宽的占用,有限制的使用HA,且只提供两备份即可; 3. 磁盘也可能形成瓶颈,如果单台机器队列很多,确认只在必要时才使用duration,避免把磁盘跑满; 4. 队列的消息大量累积后,发送和消费速度都会受到影响,导致服务进一步恶化,采用的方法是,额外的脚本监控每个队列的消息数,超过限额会执行purge操作,简单粗暴但是有效的保证了服务稳定; 5. 限制单条消息大小,我们的限制是128k,消息队列只走消息,其他交由存储去做; 6. 用iptables适当的限制连接; 2. RabbitMQ队列超长导致QueueingConsumerJVM溢出我们的服务器使用RabbitMQ作为消息中转的容器,一般RabbitMQ进程占用内存不过100M-200M,这些队列超长的RabbitMQ进程可以占用超过2G的内存。 显然消息队列的消费者出现了问题。开发查看日志发现作为该队列消费者的Java服务的日志也卡住了,重启服务后(这点做得不对,应该用jstat、jstack进行排查,而不是直接重启)又很快卡住。 通过jstat发现JVM内存都耗尽了,之后进入无尽的Full GC,所以当然不会处理队列消息和输出日志信息了。 使用jmap导出这时候的Java堆栈,命令:jmap -dump:format=b,file=29389.hprof 29389。将得到的dump文件放到MAT(Eclipse Memory Analyzer)里进行分析,发现很明显是QueueingConsumer持有大量对象导致JVM内存溢出 解决方案将basicQos设置为16后重启服务,队列终于开始消化了。用jstat 观察JVM内存使用情况,也没有再出现剧增溢出的现象。 总结:使用RabbitMQ的时候,一定要合理地设置QoS参数。RabbitMQ的默认做法其实是很脆弱的,容易导致雪崩。 3. RabbitMQ数据速率问题在边读边写的情况下:速率只与网络带宽正相关,网络使用率最高能达到接近100%,并且数据使用率很高(90%以上)。 在千兆网下,以500KB一条数据为例,读写速率均能达到200条/s,约为100MB/s。 在只写不读的情况下:写入速率瓶颈在于硬盘写入速度。 4. RabbitMQ数据存储路径变更到D盘方法Windows环境下,在安装前设置环境变量:RABBITMQ_BASE=D:\RabbitMQ_Data 5. RabbitMQ磁盘写满重启后数据丢失问题表现:磁盘写满后发送、读取程序均不能连接服务。 解决方法:将Queue、Exchange设置为Durable即不会发生数据丢失问题。 通过a.关闭服务;b.删除占位文件、erl_crash.dump;c.重启服务 三步操作后,磁盘会清理出10M左右空间,此时读取数据程序便可正常工作。 正确设计的架构,应确保RabbitMQ不会发生磁盘写满崩溃的情况。 6. RabbitMQ集群在网络带宽占满的情况下,通过集群的方式解决吞吐量不足的问题需要多台效果才明显。 假设外设吞吐率为d条/s,外设 向RabbitMQ1发送的概率为r1, 向RabbitMQ2发送的概率为r2, RabbitMQ1需要向RabbitMQ2转发的概率为r3, RabbitMQ2需要向RabbitMQ1转发的概率为r3。 那么RabbitMQ1进入的吞吐率为:(r1*d + r4*r2*d) 条/s ≈ 3d/4条/s, RabbitMQ2进入的吞吐率为:(r2*d + r3*r1*d) 条/s ≈ 3d/4条/s; 这样的确比只使用一台RabbitMQ的吞吐率d条/s要求低些。 N台RabbitMQ的集群,每台的平均吞吐率为:(2N-1)d/(N*N) 条/s;N=3时,平均吞吐率为5d/9条/s;N=4时,平均吞吐率为7d/16条/s。 解决方法:多台RabbitMQ服务器提供服务,在客户端以轮循方式访问服务,若1台down掉则不使用此台的队列服务,服务器之间没有联系,这样N台RabbitMQ的平均吞吐率为:1d/N 条/s。具体实现可以,专写一个用户收发RabbitMQ消息的jar/dll,在配置文件里填写RabbitMQ机器地址,使用轮循询问、收发的方式,提供给应用程序以黑盒方式调用。 下面提供了java版本的收发实现。 发送端sender.xml配置: <!-- 处理器相关 --> <bean id="sender" class="demo.Sender"> <property name="templates"> <list> <ref bean="template1" /> <!-- <ref bean="template2" /> --> </list> </property> </bean> <bean id="timeFlicker" class="demo.TimeFlicker"> <property name="handlers"> <list> <ref bean="sender" /> </list> </property> </bean> <!-- 处理器相关 --> <!-- amqp配置 相关 --> <rabbit:connection-factory id="connectionFactory1" host="192.1.11.108" username="guest" password="guest" virtual-host="/" /> <rabbit:connection-factory id="connectionFactory2" host="192.1.11.172" username="guest" password="guest" virtual-host="/" /> <!-- amqp配置 相关 --> <!-- 发送相关 --> <rabbit:template id="template1" connection-factory="connectionFactory1" exchange="exchange" /> <rabbit:template id="template2" connection-factory="connectionFactory2" exchange="exchange" /> <!-- 发送相关 --> |
说明:这里配置了两个RabbitMQ服务器,timeFlicker的目的是过一段时间把不能服务的RabbitMQ服务器重新添加到列表中,重试发送。 接收端receiver.xml配置: <!-- amqp配置 相关 --> <rabbit:connection-factory id="connectionFactory1" host="192.1.11.108" username="guest" password="guest" virtual-host="/" /> <rabbit:connection-factory id="connectionFactory2" host="192.1.11.172" username="guest" password="guest" virtual-host="/" /> <!-- amqp配置 相关 --> <!-- 监听相关 --> <bean id="Recv1" class="demo.Recv1" /> <rabbit:listener-container id="Listener1" connection-factory="connectionFactory1" prefetch="1" acknowledge="auto"> <rabbit:listener ref="Recv1" method="listen" queue-names="queue1" /> </rabbit:listener-container> <bean id="Recv2" class="demo.Recv2" /> <rabbit:listener-container id="Listener" connection-factory="connectionFactory1" prefetch="1" acknowledge="auto"> <rabbit:listener ref="Recv2" method="listen" queue-names="queue2" /> </rabbit:listener-container> <!-- 监听相关 --> |
MQ丢数据的问题, 主要是同步还是异步刷盘、断电是否导致的。只要send反馈正确,确保发送被接收,receive时有反馈后才会删除数据;同步刷盘,或异步刷盘不断电的,就不会丢失消息, 程序对于发送反馈异常的,要记录;MQ对于receive无反馈的,有重发机制,可能会有一条数据发送多次的情况,要在程序中剔除。 7. RabbitMQ 常用命令1. 给centos安装epel yum 源 # rpm -ivh http://dl.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm 2. 安装erlang运行环境 # yum install erlang 3. 安装rabbitmq server # rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc 5. 打开server # chkconfig rabbitmq-server on # rabbitmqctl status 会报异常: # rabbitmqctl status Status of node rabbit@devnote ... Error: unable to connect to node rabbit@devnote: nodedown DIAGNOSTICS =========== nodes in question: [rabbit@devnote] hosts, their running nodes and ports: - devnote: [{rabbitmqctl24923,51045}] current node details: - node name: rabbitmqctl24923@devnote - home dir: /var/lib/rabbitmq - cookie hash: TblHThacrBHJzl5Vt7Y4Ww== 执行命令: # /sbin/service rabbitmq-server stop # /sbin/service rabbitmq-server start # rabbitmqctl status 测试正确 查看所有队列信息 # rabbitmqctl list_queues 关闭应用 # rabbitmqctl stop_app 启动应用,和上述关闭命令配合使用,达到清空队列的目的 # rabbitmqctl start_app 清除所有队列 # rabbitmqctl reset 更多用法及参数,可以执行如下命令查看 # rabbitmqctl (1)首先关闭rabbitmq: rabbitmqctl stop_app (2)还原: rabbitmqctl reset (3)启动: rabbitmqctl start_app (3)添加用户: rabbitmqctl add_user root root (4)设置权限:rabbitmqctl set_permissions -p / root ".*" ".*" ".*" (6)查看用户: rabbitmqctl list_users 8. RabbitMQ(消息中间件)在工作中的应用场景有如下几种: 1、跨系统的异步通信,所有需要异步交互的地方都可以使用消息队列。就像我们除了打电话(同步)以外,还需要发短信,发电子邮件(异步)的通讯方式。 2、多个应用之间的耦合,由于消息是平台无关和语言无关的,而且语义上也不再是函数调用,因此更适合作为多个应用之间的松耦合的接口。基于消息队列的耦合,不需要发送方和接收方同时在线。在企业应用集成(EAI)中,文件传输,共享数据库,消息队列,远程过程调用都可以作为集成的方法。 3、应用内的同步变异步,比如订单处理,就可以由前端应用将订单信息放到队列,后端应用从队列里依次获得消息处理,高峰时的大量订单可以积压在队列里慢慢处理掉。由于同步通常意味着阻塞,而大量线程的阻塞会降低计算机的性能。 4、消息驱动的架构(EDA),系统分解为消息队列,和消息制造者和消息消费者,一个处理流程可以根据需要拆成多个阶段(Stage),阶段之间用队列连接起来,前一个阶段处理的结果放入队列,后一个阶段从队列中获取消息继续处理。 5、应用需要更灵活的耦合方式,如发布订阅,比如可以指定路由规则。 6、跨局域网,甚至跨城市的通讯(CDN行业),比如北京机房与广州机房的应用程序的通信。 这里还有一种情况,同时有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题,例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能 9. RabbitMQ解释1.Broker 简单来说就是消息队列服务器的实体。 2.Exchange 接收消息,转发消息到绑定的队列上,指定消息按什么规则,路由到哪个队列。 3.Queue 消息队列载体,用来存储消息,相同属性的 queue 可以重复定义,每个消息都会被投入到一个或多个队列。 4.Binding 绑定,它的作用就是把 Exchange 和 Queue 按照路由规则绑定起来。 5.RoutingKey 路由关键字,Exchange 根据这个关键字进行消息投递。 6.Producter 消息生产者,产生消息的程序。 7.Consumer 消息消费者,接收消息的程序。 8.Channel 消息通道,在客户端的每个连接里可建立多个 Channel,每个 channel 代表一个会话。
|