Hello World

  1. 加入依赖

    <!--ActiveMQ-->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.14.5</version>
    </dependency>
    
  2. 代码

    • Producer

      package com.junglone.simple.queue;
      
      import org.apache.activemq.ActiveMQConnectionFactory;
      
      import javax.jms.*;
      
      // Created by JungLone on 2018/03/23 14:28.
      public class Producer {
      
          static final String STR_BROKER_URL
                  = "failover:(tcp://10.253.1.238:61616,tcp://10.253.1.238:61626,tcp://10.253.1.238:61636)";
      
          static final String STR_DESTINATION = "test-MSG-Group-hello";
      
          public void sendMessage() {
              try {
                  // Create a ConnectionFactory
                  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(STR_BROKER_URL);
      
                  // Create a Connection
                  Connection connection = connectionFactory.createConnection();
                  connection.start();
      
                  // Create a Session
                  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      
                  // Create the destination (Topic or Queue)
                  Destination destination = session.createQueue(STR_DESTINATION);
      
                  // Create a MessageProducer from the Session to the Topic or Queue
                  MessageProducer producer = session.createProducer(destination);
                  producer.setDeliveryMode(DeliveryMode.PERSISTENT);
      
                  for (int i = 0; i < 1000; i++) {
      
                      // Create a messages
                      // String text = "test-MSG-Group ------ 这是一个测试消息! ---- Hello world! From: ------- NO. " + (i + 1);
                      String text = "NO. " + (i + 1) + " -- [{'flag':'1','value':'8854c92e92404b188e63c4031db0eac9','label':'交换机(虚机)'}," +
                              "{'flag':'1','value':'3f367296c2174b7981342dc6fcb39d64','label':'防火墙'}," +
                              "{'flag':'1','value':'8a3e05eeedf54f8cbed37c6fb38c6385','label':'负载均衡'}," +
                              "{'flag':'1','value':'4f0ebc601dfc40ed854e08953f0cdce8','label':'其他设备'}," +
                              "{'flag':'1','value':'6','label':'路由器'}," +
                              "{'flag':'1','value':'4','label':'交换机'}," +
                              "{'flag':'1','value':'b216ca1af7ec49e6965bac19aadf66da','label':'服务器'}," +
                              "{'flag':'1','value':'7','label':'安全设备'}," +
                              "{'flag':'1','value':'cd8b768a300a4ce4811f5deff91ef700','label':'DWDM\\SDH'}," +
                              "{'flag':'1','value':'5','label':'防火墙(模块)'}," +
                              "{'flag':'1','value':'01748963956649e589a11c644d6c09b5','label':'机箱'}]";
                      TextMessage message = session.createTextMessage(text);
      
                      // Messages Groups 配置信息
                      // message.setStringProperty("JMSXGroupID", "Test-MSG-Group"); // 设置消息分组
                      // message.setIntProperty("JMSXGroupSeq", -1); // 关闭分组
      
                      // Tell the producer to send the message
                      System.out.println("Sent message: "+ message.getText());
      
                      try {
      
                          producer.send(message);
                      } catch (ResourceAllocationException e) {
                          Thread.sleep(20 * 1000);
                          producer.send(message);
                      }
      
                  }
      
                  // Clean up
                  session.close();
                  connection.close();
              }
              catch (Exception e) {
                  System.out.println("Caught: " + e);
                  e.printStackTrace();
              }
          }
      
          public static void main(String[] args) {
              Producer producer = new Producer();
              producer.sendMessage();
          }
      
      }
      
    • Consumer

      package com.junglone.simple.queue;
      
      import org.apache.activemq.ActiveMQConnectionFactory;
      
      import javax.jms.*;
      
      /**
       * Description:
       * Created by JungLone on 2018/03/23 14:29.
       */
      public class Consumer {
      
          public void receiveMessage() {
              try {
      
                  // Create a ConnectionFactory
                  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(Producer.STR_BROKER_URL);
      
                  // Create a Connection
                  Connection connection = connectionFactory.createConnection();
                  connection.start();
      
                  // Create a Session
                  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      
                  // Create the destination (Topic or Queue)
                  Destination destination = session.createQueue(Producer.STR_DESTINATION);
      
                  // Create a MessageConsumer from the Session to the Topic or Queue
                  MessageConsumer consumer = session.createConsumer(destination);
      
                  System.out.println("---- hello ----");
      
                  // 异步接收
                  consumer.setMessageListener(new MessageListener() {
                      @Override
                      public void onMessage(Message message) {
                          if (message instanceof TextMessage) {
                              TextMessage textMessage = (TextMessage) message;
                              String text = null;
                              try {
                                  text = textMessage.getText();
                              } catch (JMSException e) {
                                  e.printStackTrace();
                              }
                              System.out.println("Received: " + text);
                          }
                      }
                  });
      
                  // // Create a MessageConsumer from the Session to the Topic or Queue
                  // MessageConsumer consumer1 = session.createConsumer(destination);
                  //
                  // System.out.println("---- world ----");
                  //
                  // // 异步接收
                  // consumer1.setMessageListener(new MessageListener() {
                  //     @Override
                  //     public void onMessage(Message message) {
                  //
                  //         if (message instanceof TextMessage) {
                  //             TextMessage textMessage = (TextMessage) message;
                  //             String text = null;
                  //             try {
                  //                 text = textMessage.getText();
                  //             } catch (JMSException e) {
                  //                 e.printStackTrace();
                  //             }
                  //             System.out.println("this is consumer1 Received: " + text);
                  //         }
                  //     }
                  // });
      
                  Thread.sleep(6 * 10 * 1000);
      
                  // // Wait for a message 同步接收,接收到消息之前一直等待,接收到一条消息后就结束了
                  // Message message = consumer.receive();
                  //
                  // if (message instanceof TextMessage) {
                  //     TextMessage textMessage = (TextMessage) message;
                  //     String text = textMessage.getText();
                  //     System.out.println("Received: " + text);
                  // } else {
                  //     System.out.println("Received: " + message);
                  // }
      
                  System.out.println("-------- world --------");
      
                  consumer.close();
                  session.close();
                  connection.close();
              } catch (Exception e) {
                  System.out.println("Caught: " + e);
                  e.printStackTrace();
              }
          }
      
          public static void main(String[] args) {
      
              Consumer consumer = new Consumer();
              consumer.receiveMessage();
          }
      
      }
      
    • 以上代码为 Queue(P2P) 消息,创建 Topic 消息代码如下,其他相同

      // Create the destination (Topic or Queue)
      Destination destination = session.createTopic(Producer.STR_DESTINATION);
      

results matching ""

    No results matching ""