A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

ActiveMQ
ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持 JMS(Java Message Service)1.1 和 J2EE 1.4 规范的 JMSProvider实现,尽管 JMS 规范出台已经是很久的事情了,但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位。
ActiveMQ 消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

主要特点:
1.    多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WSNotification,XMPP,AMQP
2.    完全支持 JMS1.1 和 J2EE 1.4 规范 (持久化,XA 消息,事务)
3.    对 Spring 的支持,ActiveMQ 可以很容易内嵌到使用Spring 的系统里面去
4.    通过了常见 J2EE 服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过 JCA 1.5 resource adaptors 的配置,可以让 ActiveMQ 可以自动的部署到任何兼容 J2EE 1.4 商业服务器上
5.    支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6.    支持通过 JDBC 和 journal 提供高速的消息持久化
7.    从设计上保证了高性能的集群,客户端-服务器,点对点
8.    支持 Ajax
9.    支持与 Axis 的整合
10.    可以很容易得调用内嵌 JMS provider,进行测试

queue 的使用

Producer

[Java] 纯文本查看 复制代码
public class Producer {
	public static void main(String[] args) throws Exception {
		// 1. 创建 ActiveMQConnectionFactory 连接工厂,需要 ActiveMQ 的服务地址,使用的是 tcp 协议
		String brokerURL = "tcp://192.168.12.168:61616";
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
		// 2. 使用连接工厂创建连接
		Connection connection = factory.createConnection();
		// 3. 使用连接对象开启连接,使用 start 方法
		connection.start();
		// 4. 从连接对象里获取 session
		// 第一个参数的作用是,是否使用 JTA 分布式事务,设置为 false 不开启
		// 第二个参数是设置应答方式,如果第一个参数是 true,那么第二个参数就失效了,这里设置的应答方式是自动应答
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 5. 从 session 获取消息类型 Destination(模式(队列还是订阅),对应的名称),获取 queue(名称为
		// myqueue)
		// 参数就是设置队列名称
		Queue queue = session.createQueue("myqueue");
		// 6. 从 session 中获取消息的生产者
		MessageProducer producer = session.createProducer(queue);
		// 7. 创建消息体 使用 TextMessage 类型
		TextMessage textMessage = new ActiveMQTextMessage();

		// 设置消息的内容
		textMessage.setText("你好!传智播客。");
		// 8. 使用消息的生产者发送消息
		producer.send(textMessage);
		System.out.println("【生产者】已发送消息...--- 你好!传智播客。");
		// 9. 释放资源
		producer.close();
		session.close();
		connection.close();
	}
}

Consumer
[Java] 纯文本查看 复制代码
public class Consumer {
	public static void main(String[] args) throws Exception {
        // 1. 创建 ActiveMQConnectionFactory 连接工厂,需要 ActiveMQ 的服务地址,使用的是 tcp 协议
        String brokerURL = "tcp://192.168.12.168:61616";
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
        // 2. 使用连接工厂创建连接
        Connection connection = factory.createConnection();
        // 3. 使用连接对象开启连接,使用 start 方法
        connection.start();
        // 4. 从连接对象里获取 session
        // 第一个参数的作用是,是否使用 JTA 分布式事务,设置为 false 不开启
        // 第二个参数是设置应答方式,如果第一个参数是 true,那么第二个参数就失效了,这里设置的应答方式是自动应答
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 5. 从 session 获取消息类型 Destination(模式(队列还是订阅),对应的名称),获取 queue(名称为 myqueue)
        // 参数就是设置队列名称
        Queue queue = session.createQueue("myqueue");
        // 6. 从 session 中获取消息的消费者
        MessageConsumer consumer = session.createConsumer(queue);
        // 7.接受消息
        while (true) {
            // 参数表示接受消息等待的时间,单位是毫秒
            Message message = consumer.receive(2000);
            // 判断消息类型是 TextMessage
            if (message instanceof TextMessage) {
            // 如果是,则进行强转
            TextMessage textMessage = (TextMessage) message;
            // 8. 消费消息,打印消息内容
            System.out.println("消费者接收到消息:" + textMessage.getText());
        }
    }
}


0 个回复

您需要登录后才可以回帖 登录 | 加入黑马