Hello World
加入依赖
<!--ActiveMQ--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.5</version> </dependency>
代码
Producer
package com.junglone.simple.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; // Created by JungLone on 2018/03/23 14:28. public class Producer { static final String STR_BROKER_URL = "failover:(tcp://10.253.1.238:61616,tcp://10.253.1.238:61626,tcp://10.253.1.238:61636)"; static final String STR_DESTINATION = "test-MSG-Group-hello"; public void sendMessage() { try { // Create a ConnectionFactory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(STR_BROKER_URL); // Create a Connection Connection connection = connectionFactory.createConnection(); connection.start(); // Create a Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create the destination (Topic or Queue) Destination destination = session.createQueue(STR_DESTINATION); // Create a MessageProducer from the Session to the Topic or Queue MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 0; i < 1000; i++) { // Create a messages // String text = "test-MSG-Group ------ 这是一个测试消息! ---- Hello world! From: ------- NO. " + (i + 1); String text = "NO. " + (i + 1) + " -- [{'flag':'1','value':'8854c92e92404b188e63c4031db0eac9','label':'交换机(虚机)'}," + "{'flag':'1','value':'3f367296c2174b7981342dc6fcb39d64','label':'防火墙'}," + "{'flag':'1','value':'8a3e05eeedf54f8cbed37c6fb38c6385','label':'负载均衡'}," + "{'flag':'1','value':'4f0ebc601dfc40ed854e08953f0cdce8','label':'其他设备'}," + "{'flag':'1','value':'6','label':'路由器'}," + "{'flag':'1','value':'4','label':'交换机'}," + "{'flag':'1','value':'b216ca1af7ec49e6965bac19aadf66da','label':'服务器'}," + "{'flag':'1','value':'7','label':'安全设备'}," + "{'flag':'1','value':'cd8b768a300a4ce4811f5deff91ef700','label':'DWDM\\SDH'}," + "{'flag':'1','value':'5','label':'防火墙(模块)'}," + "{'flag':'1','value':'01748963956649e589a11c644d6c09b5','label':'机箱'}]"; TextMessage message = session.createTextMessage(text); // Messages Groups 配置信息 // message.setStringProperty("JMSXGroupID", "Test-MSG-Group"); // 设置消息分组 // message.setIntProperty("JMSXGroupSeq", -1); // 关闭分组 // Tell the producer to send the message System.out.println("Sent message: "+ message.getText()); try { producer.send(message); } catch (ResourceAllocationException e) { Thread.sleep(20 * 1000); producer.send(message); } } // Clean up session.close(); connection.close(); } catch (Exception e) { System.out.println("Caught: " + e); e.printStackTrace(); } } public static void main(String[] args) { Producer producer = new Producer(); producer.sendMessage(); } }
Consumer
package com.junglone.simple.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Description: * Created by JungLone on 2018/03/23 14:29. */ public class Consumer { public void receiveMessage() { try { // Create a ConnectionFactory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(Producer.STR_BROKER_URL); // Create a Connection Connection connection = connectionFactory.createConnection(); connection.start(); // Create a Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create the destination (Topic or Queue) Destination destination = session.createQueue(Producer.STR_DESTINATION); // Create a MessageConsumer from the Session to the Topic or Queue MessageConsumer consumer = session.createConsumer(destination); System.out.println("---- hello ----"); // 异步接收 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = null; try { text = textMessage.getText(); } catch (JMSException e) { e.printStackTrace(); } System.out.println("Received: " + text); } } }); // // Create a MessageConsumer from the Session to the Topic or Queue // MessageConsumer consumer1 = session.createConsumer(destination); // // System.out.println("---- world ----"); // // // 异步接收 // consumer1.setMessageListener(new MessageListener() { // @Override // public void onMessage(Message message) { // // if (message instanceof TextMessage) { // TextMessage textMessage = (TextMessage) message; // String text = null; // try { // text = textMessage.getText(); // } catch (JMSException e) { // e.printStackTrace(); // } // System.out.println("this is consumer1 Received: " + text); // } // } // }); Thread.sleep(6 * 10 * 1000); // // Wait for a message 同步接收,接收到消息之前一直等待,接收到一条消息后就结束了 // Message message = consumer.receive(); // // if (message instanceof TextMessage) { // TextMessage textMessage = (TextMessage) message; // String text = textMessage.getText(); // System.out.println("Received: " + text); // } else { // System.out.println("Received: " + message); // } System.out.println("-------- world --------"); consumer.close(); session.close(); connection.close(); } catch (Exception e) { System.out.println("Caught: " + e); e.printStackTrace(); } } public static void main(String[] args) { Consumer consumer = new Consumer(); consumer.receiveMessage(); } }
以上代码为 Queue(P2P) 消息,创建 Topic 消息代码如下,其他相同
// Create the destination (Topic or Queue) Destination destination = session.createTopic(Producer.STR_DESTINATION);