一. 下载
下载地址:http://activemq.apache.org/download.html
二. 单点安装与配置
官方说明:
Version 5 Getting Started:http://activemq.apache.org/version-5-getting-started.html
在 Unix 上的单点安装:http://activemq.apache.org/version-5-getting-started.html#Version5GettingStarted-UnixBinaryInstallationUnixBinaryInstallation
三. 简单的高可用配置
Replicated LevelDB Store
官方文档地址:http://activemq.apache.org/replicated-leveldb-store.html
资源准备
Zookeeper 集群(单机版)
Zookeeper1 192.168.0.3:2181
Zookeeper2 192.168.0.3:2182
Zookeeper3 192.168.0.3:2183ActiveMQ
broker1 192.168.0.3:8161
broker2 192.168.0.3:8162
broker3 192.168.0.3:8163
修改相关配置文件
Zookeeper 配置
Zookeeper1 192.168.0.3:2181
配置文件 zoo.cfg 相关内容:# the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/home/tmp/zookeeper/server1/data # the port at which the clients will connect clientPort=2181 server.1=localhost:2166:3166 server.2=localhost:2167:3167 server.3=localhost:2168:3168
Zookeeper2 192.168.0.3:2182
配置文件 zoo.cfg 相关内容:# the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/home/tmp/zookeeper/server2/data # the port at which the clients will connect clientPort=2182 server.1=localhost:2166:3166 server.2=localhost:2167:3167 server.3=localhost:2168:3168
Zookeeper3 192.168.0.3:2183
配置文件 zoo.cfg 相关内容:# the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/home/tmp/zookeeper/server3/data # the port at which the clients will connect clientPort=2183 server.1=localhost:2166:3166 server.2=localhost:2167:3167 server.3=localhost:2168:3168
注意事项
搭建 Zookeeper 集群时需要修改相应的 myid 内容对应相应的 server.x 中的 x 的值。ActiveMQ 配置
broker1 192.168.0.3:8161
conf/jetty.xml 修改后台访问端口号
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8161"/> </bean>
conf/activemq.xml 修改连接端口号
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire-61616" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" enableStatusMonitor="true"/>
broker2 192.168.0.3:8162
conf/jetty.xml 修改后台访问端口号
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8162"/> </bean>
conf/activemq.xml 修改连接端口号
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire-61626" uri="tcp://0.0.0.0:61626?maximumConnections=1000&wireFormat.maxFrameSize=104857600" enableStatusMonitor="true"/>
broker3 192.168.0.3:8163
conf/jetty.xml 修改后台访问端口号
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8163"/> </bean>
conf/activemq.xml 修改连接端口号
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire-61636" uri="tcp://0.0.0.0:61636?maximumConnections=1000&wireFormat.maxFrameSize=104857600" enableStatusMonitor="true"/>
conf/activemq.xml LevelDB 配置
<!-- 注释掉 kahadb配置 <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter> --> <!-- 新增 LevelDB 配置 --> <persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/activemq-data" replicas="3" bind="tcp://0.0.0.0:0" zkAddress="192.168.0.3:2181,192.168.0.3:2182,192.168.0.3:2183" zkPath="/activemq/leveldb-stores" hostname="localhost" sync="local_disk" /> </persistenceAdapter>
启动
先启动 Zookeeper,再启动 ActiveMQ。访问 ActiveMQ Web Console
同一时间只有一个 ActiveMQ 对外提供访问服务,因此只能访问到一个端口号上的 Web 管理后台。
四. MasterSlave 配置
1. Shared File System Master Slave
原理:
The first broker to grab the exclusive lock on the file is the master broker. If that broker dies and releases the lock then another broker takes over. The slave brokers sit in a loop trying to grab the lock from the master broker.
配置方式
http://activemq.apache.org/shared-file-system-master-slave.html
<persistenceAdapter> <kahaDB directory="/sharedFileSystem/sharedBrokerData"/> </persistenceAdapter>
需要依赖共享文件系统。
2. JDBC Master Slave
原理
On startup one master grabs an exclusive lock in the broker database - all other brokers are slaves and pause waiting for the exclusive lock.
If the master looses connection to the database or looses the exclusive lock then it immediately shuts down. If a master shuts down or fails, one of the other slaves will grab the lock.
One of the other other slaves immediately grabs the exclusive lock on the database to them commences becoming the master, starting all of its transport connectors.
Clients loose connection to the stopped master and then the failover transport tries to connect to the available brokers - of which the only one available is the new master.
At any time you can restart other brokers which join the cluster and start as slaves waiting to become a master if the master is shutdown or a failure occurs.
配置方式
<beans> <!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> <broker xmlns="http://activemq.apache.org/schema/core"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="FOO.>"> <dispatchPolicy> <strictOrderDispatchPolicy /> </dispatchPolicy> <subscriptionRecoveryPolicy> <lastImageSubscriptionRecoveryPolicy /> </subscriptionRecoveryPolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <persistenceAdapter> <jdbcPersistenceAdapter dataDirectory="${activemq.base}/activemq-data"/> <!-- <jdbcPersistenceAdapter dataDirectory="activemq-data" dataSource="#oracle-ds"/> --> </persistenceAdapter> <transportConnectors> <transportConnector name="default" uri="tcp://localhost:61616"/> </transportConnectors> </broker> <!-- This xbean configuration file supports all the standard spring xml configuration options --> <!-- Postgres DataSource Sample Setup --> <!-- <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource"> <property name="serverName" value="localhost"/> <property name="databaseName" value="activemq"/> <property name="portNumber" value="0"/> <property name="user" value="activemq"/> <property name="password" value="activemq"/> <property name="dataSourceName" value="postgres"/> <property name="initialConnections" value="1"/> <property name="maxConnections" value="10"/> </bean> --> <!-- MySql DataSource Sample Setup --> <!-- <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="poolPreparedStatements" value="true"/> </bean> --> <!-- Oracle DataSource Sample Setup --> <!-- <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/> <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/> <property name="username" value="scott"/> <property name="password" value="tiger"/> <property name="poolPreparedStatements" value="true"/> </bean> --> <!-- Embedded Derby DataSource Sample Setup --> <!-- <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource"> <property name="databaseName" value="derbydb"/> <property name="createDatabase" value="create"/> </bean> --> </beans>
注意事项
- bean 配置与 broker 标签平级
- dateSource 与下面的 bean 里的 id 值一样;
- class="org.apache.commons.dbcp2.BasicDataSource",原始值为class="org.apache.commons.dbcp.BasicDataSource",根据 lib 目录下的 jar 包修改为 dbcp2。
- mysql 驱动 jar 包拷到 ${activemq.base}/lib/ 目录下;
修改数据库字符编码为 Latin1:alter database activemq character set Latin1;
不修改会报类似下面的错误:
com.mysql.jdbc.exceptions.MySQLSyntaxErrorException: Table ‘activemq.activemq_acks‘ doesn‘t exist.
3. Replicated LevelDB Store
五. Networks of Brokers 配置
配置文档地址:
配置消息流转
<!-- 能让消息来回流转 --> <policyEntry queue=">" enableAudit="false"> <networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> </networkBridgeFilterFactory> </policyEntry> ... <networkConnectors> <!-- conduitSubscriptions="false" 不把连接到其他 broker 上的 consumer 当做一个 consumer duplex="true" 双工模式 --> <networkConnector name="broker1" uri="static:(tcp://10.253.1.50:61616,tcp://10.253.1.53:61616)" duplex="true"/> </networkConnectors>
关于
2
中消息流转配置的说明只会在两个 broker 之间流转
A 的消息第一次流转到了 B,则 A 的消息不会再流转到 C;
消息流转之后 Queue 顺序会乱;
- Broker 本地消费优先,先消费完本地消息再消费其它 Broker 流转过来的消息;