虚拟主题(Virtual Destinations / Virtual Topics)

  1. 概述

    对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.TEST。
    对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。例如Consumer.A.VirtualTopic.TEST,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.TEST说明是一个名称为B的客户端。可以在同一个应用里使用多个consumer消费此queue,则可以实现上面两个功能。又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。
    参考资料:
    https://blog.csdn.net/kimmking/article/details/9773085
    http://activemq.apache.org/virtual-destinations.html

  2. 代码示例

    package com.junglone.simple.topic;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.command.ActiveMQTopic;
    
    import javax.jms.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Description: 虚拟 Topic 使用
     * <br/>
     * 对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.TEST。
     * 对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。
     * 例如Consumer.A.VirtualTopic.TEST,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.TEST说明是一个名称为B的客户端。
     * 可以在同一个应用里使用多个consumer消费此queue,则可以实现上面两个功能。
     * 又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。
     * 每个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。
     * <br/>
     * 参考地址:https://blog.csdn.net/kimmking/article/details/9773085
     * <br/>
     * 1.虚拟主题(Virtual Topics)
     *  ActiveMQ中,topic只有在持久订阅(durablesubscription)下是持久化的。
     *  存在持久订阅时,每个持久订阅者,都相当于一个持久化的queue的客户端,它会收取所有消息。这种情况下存在两个问题:
     *  1. 同一应用内consumer端负载均衡的问题:同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个都会获取所有消息。
     *      queue模式可以解决这个问题,broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能jms规范本身是没有的。
     *  2. 同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高。
     * 为了解决这两个问题,ActiveMQ中实现了虚拟Topic的功能。使用起来非常简单。
     * 对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.TEST。
     * 对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。
     * 例如Consumer.A.VirtualTopic.TEST,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.TEST说明是一个名称为B的客户端。
     * 可以在同一个应用里使用多个consumer消费此queue,则可以实现上面两个功能。
     * 又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。
     * 每个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。
     * Created by JungLone on 2018/04/10 16:06.
     */
    public class VirtualTopics {
    
        private static final String STR_BROKER_URL
                = "failover:(tcp://10.253.1.44:61616)";
    
        public static void main(String[] args) {
            try {
    
                ActiveMQConnectionFactory factoryA = new ActiveMQConnectionFactory(STR_BROKER_URL);
    
                Queue queue = new ActiveMQQueue(getVirtualTopicConsumerNameA());
                ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection();
                conn.start();
                Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                MessageConsumer consumer1 = session.createConsumer(queue);
                MessageConsumer consumer2 = session.createConsumer(queue);
                MessageConsumer consumer3 = session.createConsumer(new ActiveMQQueue(getVirtualTopicConsumerNameB()));
                final AtomicInteger aint1 = new AtomicInteger(0);
                MessageListener listenerA = new MessageListener() {
                    public void onMessage(Message message) {
                        try {
                            System.out.println(aint1.incrementAndGet()
                                    + " => receive from " + getVirtualTopicConsumerNameA() + ": " + message);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                };
                consumer1.setMessageListener(listenerA);
                consumer2.setMessageListener(listenerA);
    
                final AtomicInteger aint2 = new AtomicInteger(0);
                MessageListener listenerB = new MessageListener() {
                    public void onMessage(Message message) {
                        try {
                            System.out.println(aint2.incrementAndGet()
                                    + " => receive from " + getVirtualTopicConsumerNameB() + ": " + message);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                };
                consumer3.setMessageListener(listenerB);
    
                MessageProducer producer = session.createProducer(new ActiveMQTopic(getVirtualTopicName()));
                int index = 0;
                while (index++ < 100) {
                    TextMessage message = session.createTextMessage(index + " message.");
                    producer.send(message);
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        protected static String getVirtualTopicName() {
            return "VirtualTopic.TEST";
        }
    
        protected static String getVirtualTopicConsumerNameA() {
            return "Consumer.A.VirtualTopic.TEST";
        }
    
        protected static String getVirtualTopicConsumerNameB() {
            return "Consumer.B.VirtualTopic.TEST";
        }
    }
    

results matching ""

    No results matching ""