本帖最后由 longxf_sjz 于 2019-8-11 10:08 编辑
1. Spring整合JMS 1.1发布订阅模式 1.1.1消息生产者 创建maven工程,添加SpringJms 、activeMQ依赖 [XML] 纯文本查看 复制代码 <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
创建spring配置文件 [XML] 纯文本查看 复制代码 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.135:61616"/>
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!--这个是队列目的地,导入索引库-->
<bean id="queueSolrDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="pinyougou_queue_solr"/>
</bean>
1.1.2 消息生产者自定义类发送消息 [Java] 纯文本查看 复制代码 jmsTemplate.send(queueSolrDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(jsonString);
}
});
1.1.3消息消费者 创建maven工程,添加SpringJms 、activeMQ依赖 [XML] 纯文本查看 复制代码 <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
创建spring配置文件 [XML] 纯文本查看 复制代码 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.135:61616"/>
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!--这个是队列目的地,导入索引库-->
<bean id="queueSolrDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="pinyougou_queue_solr"/>
</bean>
<!-- 消息监听容器 导入索引库-->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueSolrDestination" />
<property name="messageListener" ref="itemSearchListener" />
</bean>
1.1.4 消息消费方自定义监听类 [Java] 纯文本查看 复制代码 @Component
public class ItemSearchListener implements MessageListener {
@Autowired
private ItemSearchService itemSearchService;
@Override
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
String text = textMessage.getText();//json字符串
System.out.println("监听到消息:"+text);
List<TbItem> itemList = JSON.parseArray(text, TbItem.class);
itemSearchService.importList(itemList);
System.out.println("导入到solr索引库");
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2 ActiveMQ管理平台 列表各列信息含义如下: Number Of Pending Messages:当前未出队列的数量。 Number Of Consumers:这个是正在监听的消费者数量 Messages Enqueued:进入队列的消息 进入队列的总数量,包括出队列的。 MessagesDequeued:出了队列的消息 可以理解为是消费这消费掉的数量。
|