本帖最后由 长沙-小知姐姐 于 2019-1-3 09:24 编辑
DLQ-死信队列列(Dead Letter Queue)用来保存处理理失败或者过期的消息。
出现以下情况时,消息会被redelivered
1. A transacted session is usedand rollback() is called(使 用 一个事务session,并且调用了了rollback() 方法).
2. A transacted session is closedbefore commit is called( 一个事务session,关闭之前调 用了了 commit).
3. A session is usingCLIENT_ACKNOWLEDGE and Session.recover() is called(在session中使 用CLIENT_ACKNOWLEDGE签收模式,并且调 用了了Session.recover()方法).
当 一个消息被redelivered超过maximumRedeliveries(缺省为6次,具体设置请参考后面的内容)次数时,会给broker发送 一个"Poison ack",这个消息被认为是a poison pill(毒丸),这时broker会将这个消息发送到DLQ,以便便后续处理理。
登录ActiveMQ管理理端可以看到ActiveMq有 一个默认的死信队:ActiveMQ.DLQ ,若未做设置则处理理失败的消息会自动进 入此队列列。缺省持久消息过期,会被送到DLQ, 非持久消息不不会送到DLQ。本 文将展示如何在Spring中引 入私信重发机制。
一.死信的产生原因
1.死信队列列堆积过多
测试的时候,每秒50条消息,堆积到3000 以上的时候,就会不不定时地报错
因为每次连接MQ的时候,都创建了了连接对象,占 用内存太多,建议使 用连接池,修改Spring 的配置
2. 网络故障::udp通信 网络通信不不正常会出现消息丢失,或挤压状况。
3.连接超时:ActiveMQ服务器 挂了了,要搭建集群,保证高可 用。
二.Spring + ActiveMQ配置 死信队列列(重发机制/延时发送)
1.ActiveMQ 部署时修改activemq.xml
在policyEntries节点中增加如下策略略配置。
2.服务端spring-mq配置 文件做如下配置
<?xmlversion="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"default-lazy-init="true">
<!-- ActiveMQ 连接 工 厂 -->
<!-- 真正可以产 生Connection的ConnectionFactory,由对应的 JMS服务 厂商提供--> <!--<amq:connectionFactoryid="amqConnectionFactory"
brokerURL="${brokerUrl}"
userName="${mq.userName}"password="${mq.password}"
redeliveryPolicy="activeMQPolicy"/>-->
<bean id="amqConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${brokerUrl}"></property><property name="userName" value="${mq.userName}"></property><property name="password" value="${mq.password}"></property><property name="redeliveryPolicy" ref="activeMQPolicy" /> <!-- 引
用重发机制 -->
</bean>
<!-- 加 入死信机制 -->
<bean id="activeMQPolicy" class="org.apache.activemq.RedeliveryPolicy">
<!--是否在每次尝试重新发送失败后,增 长这个等待时间 -->
<property name="useExponentialBackOff" value="true"></property>
<!--重发次数,默认为6次 这 里里设置为1次 -->
<property name="maximumRedeliveries" value="1"></property>
<!--重发时间间隔,默认为1秒 -->
<property name="initialRedeliveryDelay" value="1000"></property>
<!--第 一次失败后重新发送之前等待500毫秒,第 二次失败再等待500 * 2毫秒,这 里里的2就是value -->
<property name="backOffMultiplier" value="2"></property>
<!--最 大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设 首次重连间隔为10ms,倍数为2,那么第
二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔 大的最 大重连时间间隔时,以后每次重连时间间隔都为最 大重连时间间隔。 -->
<property name="maximumRedeliveryDelay" value="1000"></property> <!---->
<property name="destination" ref="allDestination"></property>
</bean>
<!-- Spring Caching连接 工厂 -->
<!-- Spring 用于管理理真正的ConnectionFactory的ConnectionFactory--> <bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的
ConnectionFactory --> <property name="targetConnectionFactory"
ref="amqConnectionFactory"></property>
<!-- 同上,同理理 -->
<!-- <constructor-arg ref="amqConnectionFactory"/> --> <!-- Session缓存数量量 -->
<property name="sessionCacheSize" value="100"/>
</bean>
<!-- 定义Queue监听器 -->
<bean id="allDestination"
class="org.apache.activemq.command.ActiveMQQueue"> <constructor-argvalue="C2S.Q.TEST" />
</bean>
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" /> <property name="messageListener" ref="dataReceiver" /><!--dataReceiver
为MQ处理理类,根据实际情况配置-->
<property name="destination" ref="allDestination" /> <property name="sessionTransacted" value="true"/> </bean>
三. ActiveMQ配置死信队列列(丢弃机制)
需求场景:
由于测试环境应 用复杂的原因,造成了了jms死信队列列 一直挤压很多数据,从 而导致存储爆满,进 而造成了了各个客户端不不能正常发送消息。针对这些死信队列列,一般都没有利利 用价值的。
为了了避免某队列列的死信队列列的挤压,而使整个jms不不可 用,我们选择了了通过ActiveMQ的配置,直接丢弃掉死信队列列的消息。缺省死信队列列(Dead Letter Queue)叫做ActiveMQ.DLQ所有的未送达消息都会被发送到这个队列列,以致会非常难于管理理。
所以我们为了了防 止内存挤压导致mq不不可 用的情况发 生,就需要对死信进 行行配置。可以通过配置文件(activemq.xml)来调整死信发送策略略。
一些配置策略略:
1. 不不使用缺省的死信队列列 缺省所有队列列的死信消息都被发送到同 一个缺省死信队列列,不不便便于管理理。
可以通过individualDeadLetterStrategy或sharedDeadLetterStrategy策略略来进 行行修改。如下:
useQueueForQueueMessages: 设置使 用队列列保存死信,还可以设置useQueueForTopicMessages,使 用Topic来保存死信
-->
<individualDeadLetterStrategy queuePrefix="DLQ."
useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
...
</broker>
2. 非持久消息保存到死信队列
3. 过期消息不保存到死信队列
4. 持久消息不不保存到死信队列列
对于过期的,可以通过processExpired属性来控制,对于redelivered的失败的消息,需要通过插件来实现如下:
1)丢弃所有死信
2)丢弃指定 目的死信 <beans>
<broker ...>
<plugins>
<discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.29 MY.EXAMPLE.QUEUE.87" reportInterval="1000"/>
</plugins>
</broker>
</beans> 注意, 目的名称使 用空格分隔
3) 用正则表达式过滤丢弃消息
四.死信队列列消息的属性
死信队列列中的消息,会增加 几个属性, 比如原过期时间(originalExpiration),原originalDeliveryMode等。
五.死信队列列解决方案总结
1.重发:对于安全性要求比较高的系统,那需要将发送失败的消息进行重试发送,甚至在消息一直不能到达的情况下给予相关的邮件、短信等必要的告警措施以保证消息的正确投递。
2.丢弃:在消息不是很重要以及有其他通知手段的情况下,那么对消息做丢弃处理也不失为一种好办法,毕竟如果大量不可抵达的消息存在于消息系统中会对我们的系统造成非常大的负荷,所以我们也会采用丢弃的方式进行处理。
3.不管是 重发还是丢弃,都要根据实际的业务场景而定,不一而足,同学们在实际中要具体问题具体分析。
参考:
|