整合 Spring
加入依赖
<!--ActiveMQ--> <!--5.11.4 以后的版本都引入的 springframework 包, 会与 spring 4.x 里的包冲突造成启动错误, 使用 spring 3.x 不会冲突--> <!--<dependency>--> <!--<groupId>org.apache.activemq</groupId>--> <!--<artifactId>activemq-all</artifactId>--> <!--<version>5.11.4</version>--> <!--</dependency>--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.5</version> </dependency> <!--5.15.3 需要 jdk 1.8--> <!--<dependency>--> <!--<groupId>org.apache.activemq</groupId>--> <!--<artifactId>activemq-all</artifactId>--> <!--<version>5.15.3</version>--> <!--</dependency>--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.4.2</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.framework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${spring.framework.version}</version> </dependency> <!--spring 3.x 不需要这个依赖--> <!--<dependency>--> <!--<groupId>org.springframework</groupId>--> <!--<artifactId>spring-messaging</artifactId>--> <!--<version>${spring.framework.version}</version>--> <!--</dependency>--> <!--使用 activemq 5.11.4, 5.15.3 需要使用--> <!--<dependency>--> <!--<groupId>commons-pool</groupId>--> <!--<artifactId>commons-pool</artifactId>--> <!--<version>1.6</version>--> <!--</dependency>--> <!--spring-core--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.framework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>${spring.framework.version}</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.3</version> </dependency> <!--spring-aop--> <dependency> <groupId>aopalliance</groupId> <artifactId>aopalliance</artifactId> <version>1.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>${spring.framework.version}</version> </dependency> <!--spring-mvc--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.framework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-expression</artifactId> <version>${spring.framework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${spring.framework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${spring.framework.version}</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> <version>2.5</version> <scope>provided</scope> </dependency>
web.xml
<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" id="WebApp_ID" version="2.5"> <display-name>hello-activemq-web</display-name> <context-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:config/applicationContext.xml</param-value> </context-param> <listener> <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> </listener> <context-param> <param-name>log4jConfigLocation</param-name> <param-value>classpath:property/log4j.properties</param-value> </context-param> <listener> <listener-class>org.springframework.web.util.Log4jConfigListener</listener-class> </listener> <filter> <filter-name>encodingFilter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>UTF-8</param-value> </init-param> <init-param> <param-name>forceEncoding</param-name> <param-value>true</param-value> </init-param> </filter> <filter-mapping> <filter-name>encodingFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> <!-- 加载配置SpringMVC --> <servlet> <servlet-name>springMVC</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:config/springAnnotation-servlet.xml</param-value> </init-param> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>springMVC</servlet-name> <url-pattern>/</url-pattern> </servlet-mapping> </web-app>
application.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd" default-lazy-init="true"> <!-- 引入参数配置文件 --> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath*:/property/*.properties</value> </list> </property> </bean> <!-- 组件扫描 --> <context:component-scan base-package="com.junglone"/> <!-- 自动装配 --> <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/> <import resource="spring-jms-mq.xml"/> </beans>
spring-jms-mq.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="queueCapacity" value="128"/> <property name="corePoolSize" value="128"/> <property name="maxPoolSize" value="256"/> <property name="keepAliveSeconds" value="30"/> </bean> <!-- a pooling based JMS provider --> <bean id="clientConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!--<property name="userName" value=""/>--> <!--<property name="password" value=""/>--> <property name="brokerURL"> <value>failover://(${mq_tcp})?initialReconnectDelay=10000</value> </property> <!--信任所有包--> <property name="trustAllPackages" value="true"/> <!--异步发送--> <property name="useAsyncSend" value="true"/> <!--签收--> <property name="optimizeAcknowledge" value="true"/> <!--签收延迟--> <property name="optimizeAcknowledgeTimeOut" value="1000"/> <!--异步 ack--> <property name="sendAcksAsync" value="true"/> <!--发送超时--> <property name="sendTimeout" value="1000"/> <!--线程池中线程的个数,默认为 Integer.MAX_VALUE--> <property name="maxThreadPoolSize" value="256"/> <property name="redeliveryPolicy"> <bean class="org.apache.activemq.RedeliveryPolicy"> <!--消息最大重发次数,默认6--> <property name="maximumRedeliveries" value="3"/> <!--重发时间间隔,默认为 1000L 毫秒--> <property name="initialRedeliveryDelay" value="1000"/> <!--是否在每次尝试重新发送失败后,增长这个等待时间--> <property name="useExponentialBackOff" value="true"/> <!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value--> <property name="backOffMultiplier" value="2"/> <!-- 最大传送延迟,只在useExponentialBackOff为true时有效(V5.5), 假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms, 当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 --> <property name="maximumRedeliveryDelay" value="1000"/> </bean> </property> <!--预取限制--> <property name="prefetchPolicy"> <bean class="org.apache.activemq.ActiveMQPrefetchPolicy"> <property name="queuePrefetch" value="1"/> <property name="queueBrowserPrefetch" value="1"/> </bean> </property> <!--<property name="clientID" value="testClientID"/>--> <!--<property name="clientIDPrefix" value="testClientID----"/>--> </bean> </property> <property name="createConnectionOnStartup" value="true"/> <property name="maxConnections" value="128"/> <property name="reconnectOnException" value="true"/> <property name="maximumActiveSessionPerConnection" value="256"/> <property name="idleTimeout" value="20000"/> </bean> <bean id="producerJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="clientConnectionFactory"/> <!--服务质量开关:如果是 true, deliveryMode, priority, timeToLive 的值将被使用, 否则使用默认的值。--> <property name="explicitQosEnabled" value="true"/> <!--NON_PERSISTENT非持久化 1, PERSISTENT持久化 2--> <property name="deliveryMode" value="2"/> <!--session事务--> <property name="sessionTransacted" value="false"/> <!-- SESSION_TRANSACTED = 0 事务提交并确认 AUTO_ACKNOWLEDGE = 1 自动确认 CLIENT_ACKNOWLEDGE = 2 客户端手动确认 DUPS_OK_ACKNOWLEDGE = 3 自动批量确认 INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认(自定义ACK_MODE) 消息确认模式, 4 spring-jms 自定义单条确认方式 --> <property name="sessionAcknowledgeMode" value="4"/> <!-- 非pub/sub模型(发布/订阅),即队列模式 --> <property name="pubSubDomain" value="false"/> </bean> <bean id="testActiveMQListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="clientConnectionFactory"/> <property name="taskExecutor" ref="threadPoolTaskExecutor"/> <property name="clientId" value="HelloWorldTest"/> <property name="destination"> <bean class="org.apache.activemq.command.ActiveMQQueue"> <!-- 队列名称 --> <constructor-arg value="test-MSG-Group-hello"/> </bean> </property> <property name="messageListener"> <bean class="com.junglone.controller.MQConsumerListener"/> </property> <!-- session 事务 实际消费时事务模式受此配置影响 使用 spring-jms 自定义单条确认方式的 ack 时需要关闭事务 --> <property name="sessionTransacted" value="false"/> <!-- SESSION_TRANSACTED = 0 事务提交并确认 AUTO_ACKNOWLEDGE = 1 自动确认 CLIENT_ACKNOWLEDGE = 2 客户端手动确认 DUPS_OK_ACKNOWLEDGE = 3 自动批量确认 INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认(自定义ACK_MODE) 消息确认模式, 4 spring-jms 自定义单条确认方式 使用单条确认模式的时候需要在 listener 关闭 session 事务 --> <property name="sessionAcknowledgeMode" value="4"/> <!--设置多线程--> <property name="concurrency" value="64-128"/> </bean> </beans>
MQConsumerListener.java
package com.junglone.controller; import org.apache.log4j.Logger; import org.springframework.jms.listener.adapter.MessageListenerAdapter; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; /** * Description: * Created by JungLone on 2017/09/29 15:21. */ public class MQConsumerListener extends MessageListenerAdapter { private static final Logger LOGGER = Logger.getLogger(MQConsumerListener.class); @Override public void onMessage(Message message, Session session) throws JMSException { TextMessage textMessage = (TextMessage) message; String strMsg = textMessage.getText(); // LOGGER.info("message ---- " + JSON.toJSONString(message)); // LOGGER.info("session ---- " + JSON.toJSONString(session)); // if (!strMsg.startsWith("hello")) { // // System.out.println("session.getTransacted() ==== " + session.getTransacted()); // System.out.println("ack mode ==== " + session.getAcknowledgeMode()); // System.out.println("strMsg ==== " + strMsg); // session.rollback(); // // textMessage.acknowledge(); // } else { // // if (strMsg.contains("world")) { // System.out.println("ack mode ==== " + session.getAcknowledgeMode()); // System.out.println("strMsg ==== " + strMsg); // textMessage.acknowledge(); // } else { // throw new RuntimeException("Exception------------------------"); // } // // } // System.out.println("ack mode ==== " + session.getAcknowledgeMode()); // System.out.println("strMsg ==== " + strMsg); LOGGER.info("thread-name ---- " + Thread.currentThread().getName() + " ---- " + " session.getTransacted() ---- " + session.getTransacted() + " ---- session.getAcknowledgeMode() ---- " + session.getAcknowledgeMode() + " ---- " + strMsg); textMessage.acknowledge(); } }
Producer.java
package com.junglone.sample; import com.junglone.JUnit4ClassRunner; import com.junglone.common.DateTimeUtil; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.jms.core.JmsTemplate; import org.springframework.test.context.ContextConfiguration; import javax.annotation.Resource; /** * Description: * Created by JungLone on 2018/04/24 11:37. */ // 使用 JUnit4 进行测试 @RunWith(JUnit4ClassRunner.class) // 加载配置文件 @ContextConfiguration(locations = {"classpath:config/*.xml"}) public class Producer { @Resource private JmsTemplate producerJmsTemplate; @Test public void produce() { String strQueueName = "test-MSG-Group-hello"; String strNowTime = DateTimeUtil.getNowTimeNormalString(); String strMsg = strNowTime + " -- [{'flag':'1','value':'8854c92e92404b188e63c4031db0eac9','label':'交换机(虚机)'}," + "{'flag':'1','value':'3f367296c2174b7981342dc6fcb39d64','label':'防火墙'}," + "{'flag':'1','value':'8a3e05eeedf54f8cbed37c6fb38c6385','label':'负载均衡'}," + "{'flag':'1','value':'4f0ebc601dfc40ed854e08953f0cdce8','label':'其他设备'}," + "{'flag':'1','value':'6','label':'路由器'}," + "{'flag':'1','value':'4','label':'交换机'}," + "{'flag':'1','value':'b216ca1af7ec49e6965bac19aadf66da','label':'服务器'}," + "{'flag':'1','value':'7','label':'安全设备'}," + "{'flag':'1','value':'cd8b768a300a4ce4811f5deff91ef700','label':'DWDM\\SDH'}," + "{'flag':'1','value':'5','label':'防火墙(模块)'}," + "{'flag':'1','value':'01748963956649e589a11c644d6c09b5','label':'机箱'}]"; for (int i = 0; i < 10; i++) { String strMsg1 = i + " -- " + strMsg; producerJmsTemplate.convertAndSend(strQueueName, strMsg1); System.out.println("strMsg -- " + strMsg1); } } }