分组消息(Message Groups)
概述
ActiveMQ 5.3版本新增加了这个功能。Message Groups可以看做Exclusive Consumer的升级版,是一个可以并行的Exclusive Consumer。原理是通过使用JMSXGroupID来定义消息组。拥有相同的JMSXGroupID的消息将发送到同一个Queue。这样既可以保证消费者的高可用(因为可以有多个消费者消费同一队列),又可以保证消息按顺序消费,还不会像Exclusive Consumer一样浪费资源(需要额外的处于等待状态的消费者待命)。如果消费者被关闭或者消息组被关闭,这个拥有JMSXGroupID的消息会自动被发送到其他消费者。
设置JMSXGroupID的例子如下:
Mesasge message = session.createTextMessage("foo"); message.setStringProperty("JMSXGroupID", "your business key"); ... producer.send(message);
关闭消息组,通过设置JMSXGroupSeq的值为-1,例子如下:
Mesasge message = session.createTextMessage("foo"); message.setStringProperty("JMSXGroupID", "your business key"); message.setIntProperty("JMSXGroupSeq", -1); // ... producer.send(message);
可以使用JMSXGroupFirstForConsumer来判断这个消费者是否是第一次消费这个JMSXGroupID的消息:
if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) { // flush cache for groupId }
如果Broker中已经有消息了,这时由于启动消费者的速度不一致,可能会导致某些消费者先启动并率先消费消息,导致A消费者的负载不均匀。可以在Broker中配置timeBeforeDispatchStarts,让消费者延迟一段时间再开始负载消费。或者配置consumersBeforeDispatchStarts,让消费者达到一定数量再开始负载消费。
具体配置如下,修改${ACTIVEMQ_HOME}\conf\activemq.xml:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="2000"/> </policyEntries> </policyMap> </destinationPolicy>
设置有两个消费者都启动好或者2秒之后,在开始负载消费消息。
参考: https://my.oschina.net/u/719192/blog/293749 http://activemq.apache.org/message-groups.html
代码示例
见
1. Hello World
注释掉的 Message Group 部分// Messages Groups 配置信息 // message.setStringProperty("JMSXGroupID", "Test-MSG-Group"); // 设置消息分组 // message.setIntProperty("JMSXGroupSeq", -1); // 关闭分组