分组消息(Message Groups)

  1. 概述

    ActiveMQ 5.3版本新增加了这个功能。Message Groups可以看做Exclusive Consumer的升级版,是一个可以并行的Exclusive Consumer。原理是通过使用JMSXGroupID来定义消息组。拥有相同的JMSXGroupID的消息将发送到同一个Queue。这样既可以保证消费者的高可用(因为可以有多个消费者消费同一队列),又可以保证消息按顺序消费,还不会像Exclusive Consumer一样浪费资源(需要额外的处于等待状态的消费者待命)。如果消费者被关闭或者消息组被关闭,这个拥有JMSXGroupID的消息会自动被发送到其他消费者。

    设置JMSXGroupID的例子如下:

     Mesasge message = session.createTextMessage("foo");   
     message.setStringProperty("JMSXGroupID", "your business key");   
     ...
     producer.send(message);
    

    关闭消息组,通过设置JMSXGroupSeq的值为-1,例子如下:

     Mesasge message = session.createTextMessage("foo");
     message.setStringProperty("JMSXGroupID", "your business key");
     message.setIntProperty("JMSXGroupSeq", -1);
     // ...
     producer.send(message);
    

    可以使用JMSXGroupFirstForConsumer来判断这个消费者是否是第一次消费这个JMSXGroupID的消息:

     if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) {
        // flush cache for groupId
     }
    

    如果Broker中已经有消息了,这时由于启动消费者的速度不一致,可能会导致某些消费者先启动并率先消费消息,导致A消费者的负载不均匀。可以在Broker中配置timeBeforeDispatchStarts,让消费者延迟一段时间再开始负载消费。或者配置consumersBeforeDispatchStarts,让消费者达到一定数量再开始负载消费。

    具体配置如下,修改${ACTIVEMQ_HOME}\conf\activemq.xml:

     <destinationPolicy>
       <policyMap>
         <policyEntries>
           <policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="2000"/>
         </policyEntries>
       </policyMap>
     </destinationPolicy>
    

    设置有两个消费者都启动好或者2秒之后,在开始负载消费消息。

    参考: https://my.oschina.net/u/719192/blog/293749 http://activemq.apache.org/message-groups.html

  2. 代码示例

    1. Hello World 注释掉的 Message Group 部分

     // Messages Groups 配置信息
     // message.setStringProperty("JMSXGroupID", "Test-MSG-Group"); // 设置消息分组
     // message.setIntProperty("JMSXGroupSeq", -1); // 关闭分组
    

results matching ""

    No results matching ""