整合 Spring

  1. 加入依赖

     <!--ActiveMQ-->
     <!--5.11.4 以后的版本都引入的 springframework 包, 会与 spring 4.x 里的包冲突造成启动错误, 使用 spring 3.x 不会冲突-->
     <!--<dependency>-->
     <!--<groupId>org.apache.activemq</groupId>-->
     <!--<artifactId>activemq-all</artifactId>-->
     <!--<version>5.11.4</version>-->
     <!--</dependency>-->
     <dependency>
         <groupId>org.apache.activemq</groupId>
         <artifactId>activemq-all</artifactId>
         <version>5.14.5</version>
     </dependency>
     <!--5.15.3 需要 jdk 1.8-->
     <!--<dependency>-->
     <!--<groupId>org.apache.activemq</groupId>-->
     <!--<artifactId>activemq-all</artifactId>-->
     <!--<version>5.15.3</version>-->
     <!--</dependency>-->
     <dependency>
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-pool2</artifactId>
         <version>2.4.2</version>
     </dependency>
     <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-jms</artifactId>
         <version>${spring.framework.version}</version>
     </dependency>
     <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-tx</artifactId>
         <version>${spring.framework.version}</version>
     </dependency>
     <!--spring 3.x 不需要这个依赖-->
     <!--<dependency>-->
     <!--<groupId>org.springframework</groupId>-->
     <!--<artifactId>spring-messaging</artifactId>-->
     <!--<version>${spring.framework.version}</version>-->
     <!--</dependency>-->
     <!--使用 activemq 5.11.4, 5.15.3 需要使用-->
     <!--<dependency>-->
     <!--<groupId>commons-pool</groupId>-->
     <!--<artifactId>commons-pool</artifactId>-->
     <!--<version>1.6</version>-->
     <!--</dependency>-->
    
     <!--spring-core-->
     <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-core</artifactId>
         <version>${spring.framework.version}</version>
     </dependency>
     <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-beans</artifactId>
         <version>${spring.framework.version}</version>
     </dependency>
     <dependency>
         <groupId>commons-logging</groupId>
         <artifactId>commons-logging</artifactId>
         <version>1.1.3</version>
     </dependency>
    
     <!--spring-aop-->
     <dependency>
         <groupId>aopalliance</groupId>
         <artifactId>aopalliance</artifactId>
         <version>1.0</version>
     </dependency>
     <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-aop</artifactId>
         <version>${spring.framework.version}</version>
     </dependency>
    
     <!--spring-mvc-->
     <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-context</artifactId>
         <version>${spring.framework.version}</version>
     </dependency>
     <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-expression</artifactId>
         <version>${spring.framework.version}</version>
     </dependency>
     <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-web</artifactId>
         <version>${spring.framework.version}</version>
     </dependency>
     <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-webmvc</artifactId>
         <version>${spring.framework.version}</version>
     </dependency>
    
     <dependency>
         <groupId>javax.servlet</groupId>
         <artifactId>servlet-api</artifactId>
         <version>2.5</version>
         <scope>provided</scope>
     </dependency>
    
  2. web.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xmlns="http://java.sun.com/xml/ns/javaee"
          xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
          http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
          id="WebApp_ID" version="2.5">
     <display-name>hello-activemq-web</display-name>
    
     <context-param>
      <param-name>contextConfigLocation</param-name>
      <param-value>classpath:config/applicationContext.xml</param-value>
     </context-param>
    
     <listener>
      <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
     </listener>
     <context-param>
      <param-name>log4jConfigLocation</param-name>
      <param-value>classpath:property/log4j.properties</param-value>
     </context-param>
     <listener>
      <listener-class>org.springframework.web.util.Log4jConfigListener</listener-class>
     </listener>
    
     <filter>
      <filter-name>encodingFilter</filter-name>
      <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
      <init-param>
          <param-name>encoding</param-name>
          <param-value>UTF-8</param-value>
      </init-param>
      <init-param>
          <param-name>forceEncoding</param-name>
          <param-value>true</param-value>
      </init-param>
     </filter>
     <filter-mapping>
      <filter-name>encodingFilter</filter-name>
      <url-pattern>/*</url-pattern>
     </filter-mapping>
    
     <!-- 加载配置SpringMVC -->
     <servlet>
      <servlet-name>springMVC</servlet-name>
      <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
      <init-param>
          <param-name>contextConfigLocation</param-name>
          <param-value>classpath:config/springAnnotation-servlet.xml</param-value>
      </init-param>
      <load-on-startup>1</load-on-startup>
     </servlet>
    
     <servlet-mapping>
      <servlet-name>springMVC</servlet-name>
      <url-pattern>/</url-pattern>
     </servlet-mapping>
    
    </web-app>
    
  3. application.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context="http://www.springframework.org/schema/context"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
          http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
          http://www.springframework.org/schema/context
          http://www.springframework.org/schema/context/spring-context-3.0.xsd"
        default-lazy-init="true">
    
     <!-- 引入参数配置文件 -->
     <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
         <property name="locations">
             <list>
                 <value>classpath*:/property/*.properties</value>
             </list>
         </property>
     </bean>
    
     <!-- 组件扫描 -->
     <context:component-scan base-package="com.junglone"/>
    
     <!-- 自动装配 -->
     <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
     <import resource="spring-jms-mq.xml"/>
    </beans>
    
  4. spring-jms-mq.xml

     <?xml version="1.0" encoding="UTF-8"?>
     <beans xmlns="http://www.springframework.org/schema/beans"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
    
         <bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
             <property name="queueCapacity" value="128"/>
             <property name="corePoolSize" value="128"/>
             <property name="maxPoolSize" value="256"/>
             <property name="keepAliveSeconds" value="30"/>
         </bean>
    
         <!-- a pooling based JMS provider -->
         <bean id="clientConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
             <property name="connectionFactory">
                 <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                     <!--<property name="userName" value=""/>-->
                     <!--<property name="password" value=""/>-->
                     <property name="brokerURL">
                         <value>failover://(${mq_tcp})?initialReconnectDelay=10000</value>
                     </property>
    
                     <!--信任所有包-->
                     <property name="trustAllPackages" value="true"/>
                     <!--异步发送-->
                     <property name="useAsyncSend" value="true"/>
                     <!--签收-->
                     <property name="optimizeAcknowledge" value="true"/>
                     <!--签收延迟-->
                     <property name="optimizeAcknowledgeTimeOut" value="1000"/>
                     <!--异步 ack-->
                     <property name="sendAcksAsync" value="true"/>
                     <!--发送超时-->
                     <property name="sendTimeout" value="1000"/>
                     <!--线程池中线程的个数,默认为 Integer.MAX_VALUE-->
                     <property name="maxThreadPoolSize" value="256"/>
    
                     <property name="redeliveryPolicy">
                         <bean class="org.apache.activemq.RedeliveryPolicy">
                             <!--消息最大重发次数,默认6-->
                             <property name="maximumRedeliveries" value="3"/>
                             <!--重发时间间隔,默认为 1000L 毫秒-->
                             <property name="initialRedeliveryDelay" value="1000"/>
                             <!--是否在每次尝试重新发送失败后,增长这个等待时间-->
                             <property name="useExponentialBackOff" value="true"/>
                             <!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value-->
                             <property name="backOffMultiplier" value="2"/>
                             <!--
                                 最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),
                                 假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,
                                 当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。
                             -->
                             <property name="maximumRedeliveryDelay" value="1000"/>
                         </bean>
                     </property>
                     <!--预取限制-->
                     <property name="prefetchPolicy">
                         <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
                             <property name="queuePrefetch" value="1"/>
                             <property name="queueBrowserPrefetch" value="1"/>
                         </bean>
                     </property>
                     <!--<property name="clientID" value="testClientID"/>-->
                     <!--<property name="clientIDPrefix" value="testClientID&#45;&#45;&#45;&#45;"/>-->
                 </bean>
             </property>
             <property name="createConnectionOnStartup" value="true"/>
             <property name="maxConnections" value="128"/>
             <property name="reconnectOnException" value="true"/>
             <property name="maximumActiveSessionPerConnection" value="256"/>
             <property name="idleTimeout" value="20000"/>
         </bean>
    
         <bean id="producerJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
             <property name="connectionFactory" ref="clientConnectionFactory"/>
             <!--服务质量开关:如果是 true, deliveryMode, priority, timeToLive 的值将被使用, 否则使用默认的值。-->
             <property name="explicitQosEnabled" value="true"/>
             <!--NON_PERSISTENT非持久化 1, PERSISTENT持久化 2-->
             <property name="deliveryMode" value="2"/>
             <!--session事务-->
             <property name="sessionTransacted" value="false"/>
             <!--
                 SESSION_TRANSACTED = 0          事务提交并确认
                 AUTO_ACKNOWLEDGE = 1            自动确认
                 CLIENT_ACKNOWLEDGE = 2          客户端手动确认
                 DUPS_OK_ACKNOWLEDGE = 3         自动批量确认
                 INDIVIDUAL_ACKNOWLEDGE = 4      单条消息确认(自定义ACK_MODE)
                 消息确认模式, 4 spring-jms 自定义单条确认方式
             -->
             <property name="sessionAcknowledgeMode" value="4"/>
             <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
             <property name="pubSubDomain" value="false"/>
         </bean>
    
         <bean id="testActiveMQListener"
               class="org.springframework.jms.listener.DefaultMessageListenerContainer">
             <property name="connectionFactory" ref="clientConnectionFactory"/>
             <property name="taskExecutor" ref="threadPoolTaskExecutor"/>
             <property name="clientId" value="HelloWorldTest"/>
             <property name="destination">
                 <bean class="org.apache.activemq.command.ActiveMQQueue">
                     <!-- 队列名称 -->
                     <constructor-arg value="test-MSG-Group-hello"/>
                 </bean>
             </property>
             <property name="messageListener">
                 <bean class="com.junglone.controller.MQConsumerListener"/>
             </property>
             <!--
                 session 事务
                 实际消费时事务模式受此配置影响
                 使用 spring-jms 自定义单条确认方式的 ack 时需要关闭事务
             -->
             <property name="sessionTransacted" value="false"/>
             <!--
                 SESSION_TRANSACTED = 0          事务提交并确认
                 AUTO_ACKNOWLEDGE = 1            自动确认
                 CLIENT_ACKNOWLEDGE = 2          客户端手动确认
                 DUPS_OK_ACKNOWLEDGE = 3         自动批量确认
                 INDIVIDUAL_ACKNOWLEDGE = 4      单条消息确认(自定义ACK_MODE)
                 消息确认模式, 4 spring-jms 自定义单条确认方式
                 使用单条确认模式的时候需要在 listener 关闭 session 事务
             -->
             <property name="sessionAcknowledgeMode" value="4"/>
             <!--设置多线程-->
             <property name="concurrency" value="64-128"/>
         </bean>
    
     </beans>
    
  5. MQConsumerListener.java

     package com.junglone.controller;
    
     import org.apache.log4j.Logger;
     import org.springframework.jms.listener.adapter.MessageListenerAdapter;
    
     import javax.jms.JMSException;
     import javax.jms.Message;
     import javax.jms.Session;
     import javax.jms.TextMessage;
    
     /**
      * Description:
      * Created by JungLone on 2017/09/29 15:21.
      */
     public class MQConsumerListener extends MessageListenerAdapter {
    
         private static final Logger LOGGER = Logger.getLogger(MQConsumerListener.class);
    
         @Override
         public void onMessage(Message message, Session session) throws JMSException {
    
             TextMessage textMessage = (TextMessage) message;
             String strMsg = textMessage.getText();
    
             // LOGGER.info("message ---- " + JSON.toJSONString(message));
             // LOGGER.info("session ---- " + JSON.toJSONString(session));
    
             // if (!strMsg.startsWith("hello")) {
             //
             //     System.out.println("session.getTransacted() ==== " + session.getTransacted());
             //     System.out.println("ack mode ==== " + session.getAcknowledgeMode());
             //     System.out.println("strMsg ==== " + strMsg);
             //     session.rollback();
             //     // textMessage.acknowledge();
             // } else {
             //
             //     if (strMsg.contains("world")) {
             //         System.out.println("ack mode ==== " + session.getAcknowledgeMode());
             //         System.out.println("strMsg ==== " + strMsg);
             //         textMessage.acknowledge();
             //     } else {
             //         throw new RuntimeException("Exception------------------------");
             //     }
             //
             // }
    
             // System.out.println("ack mode ==== " + session.getAcknowledgeMode());
             // System.out.println("strMsg ==== " + strMsg);
             LOGGER.info("thread-name ---- " + Thread.currentThread().getName() + " ---- "
                     + " session.getTransacted() ---- " + session.getTransacted()
                     + " ---- session.getAcknowledgeMode() ---- " + session.getAcknowledgeMode() + " ---- " + strMsg);
             textMessage.acknowledge();
    
         }
     }
    
  6. Producer.java

     package com.junglone.sample;
    
     import com.junglone.JUnit4ClassRunner;
     import com.junglone.common.DateTimeUtil;
     import org.junit.Test;
     import org.junit.runner.RunWith;
     import org.springframework.jms.core.JmsTemplate;
     import org.springframework.test.context.ContextConfiguration;
    
     import javax.annotation.Resource;
    
     /**
      * Description:
      * Created by JungLone on 2018/04/24 11:37.
      */
    
     // 使用 JUnit4 进行测试
     @RunWith(JUnit4ClassRunner.class)
     // 加载配置文件
     @ContextConfiguration(locations = {"classpath:config/*.xml"})
     public class Producer {
    
         @Resource
         private JmsTemplate producerJmsTemplate;
    
         @Test
         public void produce() {
    
             String strQueueName = "test-MSG-Group-hello";
             String strNowTime = DateTimeUtil.getNowTimeNormalString();
             String strMsg = strNowTime + " -- [{'flag':'1','value':'8854c92e92404b188e63c4031db0eac9','label':'交换机(虚机)'}," +
                     "{'flag':'1','value':'3f367296c2174b7981342dc6fcb39d64','label':'防火墙'}," +
                     "{'flag':'1','value':'8a3e05eeedf54f8cbed37c6fb38c6385','label':'负载均衡'}," +
                     "{'flag':'1','value':'4f0ebc601dfc40ed854e08953f0cdce8','label':'其他设备'}," +
                     "{'flag':'1','value':'6','label':'路由器'}," +
                     "{'flag':'1','value':'4','label':'交换机'}," +
                     "{'flag':'1','value':'b216ca1af7ec49e6965bac19aadf66da','label':'服务器'}," +
                     "{'flag':'1','value':'7','label':'安全设备'}," +
                     "{'flag':'1','value':'cd8b768a300a4ce4811f5deff91ef700','label':'DWDM\\SDH'}," +
                     "{'flag':'1','value':'5','label':'防火墙(模块)'}," +
                     "{'flag':'1','value':'01748963956649e589a11c644d6c09b5','label':'机箱'}]";
    
             for (int i = 0; i < 10; i++) {
                 String strMsg1 = i + " -- " + strMsg;
                 producerJmsTemplate.convertAndSend(strQueueName, strMsg1);
    
                 System.out.println("strMsg -- " + strMsg1);
             }
    
         }
    
     }
    
  7. 完整代码地址:https://gitee.com/junglone/hello-activemq.git

results matching ""

    No results matching ""