虚拟主题(Virtual Destinations / Virtual Topics)
概述
对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.TEST。
对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。例如Consumer.A.VirtualTopic.TEST,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.TEST说明是一个名称为B的客户端。可以在同一个应用里使用多个consumer消费此queue,则可以实现上面两个功能。又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。
参考资料:
https://blog.csdn.net/kimmking/article/details/9773085
http://activemq.apache.org/virtual-destinations.html代码示例
package com.junglone.simple.topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import javax.jms.*; import java.util.concurrent.atomic.AtomicInteger; /** * Description: 虚拟 Topic 使用 * <br/> * 对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.TEST。 * 对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。 * 例如Consumer.A.VirtualTopic.TEST,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.TEST说明是一个名称为B的客户端。 * 可以在同一个应用里使用多个consumer消费此queue,则可以实现上面两个功能。 * 又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。 * 每个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。 * <br/> * 参考地址:https://blog.csdn.net/kimmking/article/details/9773085 * <br/> * 1.虚拟主题(Virtual Topics) * ActiveMQ中,topic只有在持久订阅(durablesubscription)下是持久化的。 * 存在持久订阅时,每个持久订阅者,都相当于一个持久化的queue的客户端,它会收取所有消息。这种情况下存在两个问题: * 1. 同一应用内consumer端负载均衡的问题:同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个都会获取所有消息。 * queue模式可以解决这个问题,broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能jms规范本身是没有的。 * 2. 同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高。 * 为了解决这两个问题,ActiveMQ中实现了虚拟Topic的功能。使用起来非常简单。 * 对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.TEST。 * 对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。 * 例如Consumer.A.VirtualTopic.TEST,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.TEST说明是一个名称为B的客户端。 * 可以在同一个应用里使用多个consumer消费此queue,则可以实现上面两个功能。 * 又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。 * 每个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。 * Created by JungLone on 2018/04/10 16:06. */ public class VirtualTopics { private static final String STR_BROKER_URL = "failover:(tcp://10.253.1.44:61616)"; public static void main(String[] args) { try { ActiveMQConnectionFactory factoryA = new ActiveMQConnectionFactory(STR_BROKER_URL); Queue queue = new ActiveMQQueue(getVirtualTopicConsumerNameA()); ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer1 = session.createConsumer(queue); MessageConsumer consumer2 = session.createConsumer(queue); MessageConsumer consumer3 = session.createConsumer(new ActiveMQQueue(getVirtualTopicConsumerNameB())); final AtomicInteger aint1 = new AtomicInteger(0); MessageListener listenerA = new MessageListener() { public void onMessage(Message message) { try { System.out.println(aint1.incrementAndGet() + " => receive from " + getVirtualTopicConsumerNameA() + ": " + message); } catch (Exception e) { e.printStackTrace(); } } }; consumer1.setMessageListener(listenerA); consumer2.setMessageListener(listenerA); final AtomicInteger aint2 = new AtomicInteger(0); MessageListener listenerB = new MessageListener() { public void onMessage(Message message) { try { System.out.println(aint2.incrementAndGet() + " => receive from " + getVirtualTopicConsumerNameB() + ": " + message); } catch (Exception e) { e.printStackTrace(); } } }; consumer3.setMessageListener(listenerB); MessageProducer producer = session.createProducer(new ActiveMQTopic(getVirtualTopicName())); int index = 0; while (index++ < 100) { TextMessage message = session.createTextMessage(index + " message."); producer.send(message); } } catch (Exception e) { e.printStackTrace(); } } protected static String getVirtualTopicName() { return "VirtualTopic.TEST"; } protected static String getVirtualTopicConsumerNameA() { return "Consumer.A.VirtualTopic.TEST"; } protected static String getVirtualTopicConsumerNameB() { return "Consumer.B.VirtualTopic.TEST"; } }