ActiveMQ是一款功能強大的消息服務器,它支持許多種開發語言,例如java, C, C++, C#等等。企業級消息服務器無論對服務器穩定性還是速度,要求都很高,而ActiveMQ的分布式集群則能很好的滿足這一需求,下面說說ActiveMQ的幾種集群配置。
Queue consumer clusters
此集群讓多個消費者同時消費一個隊列,若某個消費者出問題無法消費信息,則未消費掉的消息將被發給其他正常的消費者,結構圖如下:

Broker clusters
此種配置是一個消費者連接到多個broker集群的中的一個broker,當該broker出問題時,消費者自動連接到其他一個正常的broker。消費者使用 failover:// 協議來連接broker。
failover:(tcp://localhost:61616,tcp://localhost:61617)
failover官網介紹 http://activemq.apache.org/failover-transport-reference.html
broker之間的通過靜態發現(static discovery)和動態發現(dynamic discovery)來維持彼此發現,下面來介紹靜態發現和動態發現的機制:
靜態發現:
靜態發現通過配置固定的broker uri來發現彼此,配置語法如下:
static:(uri1,uri2,uri3,...)?options
例如:
static:(tcp://localhost:61616,tcp://remotehost:61617?trace=false,vm://localbroker)?initialReconnectDelay=100
更多靜態發現介紹,見ActiveMQ官網 http://activemq.apache.org/static-transport-reference.html
動態發現:
動態發現機制是在各個broker啟動時通過Fanout transport來發現彼此,配置舉例如下:
1 <broker name="foo">2 <transportConnectors>3 <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>4 </transportConnectors>5 ...6 </broker>
更多動態發現機制介紹,見官網http://activemq.apache.org/discovery-transport-reference.html
Networks of brokers
多個broker組成集群,當其中一個broker的消費者出問題導致消息堆積無法消費掉時,通過ActiveMQ支持的Network of Broker方案可將該broker堆積的消息轉發到其他有消費者的broker。該方案主要有以下兩種配置方式:
1、為broker配置文件配置networkConnector元素
2、使用發現機制互相探測broker
Here is an example of using the fixed list of URIs:
1 <?xml version="1.0" encoding="UTF-8"?> 2 3 <beans xmlns="http://activemq.org/config/1.0"> 4 5 <broker brokerName="receiver" persistent="false" useJmx="false"> 6 <networkConnectors> 7 <!-- Static discovery --> 8 <networkConnector uri="static:(tcp://localhost:62001)"/> 9 <!-- MasterSlave Discovery -->10 <!--<networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> -->11 </networkConnectors>12 13 <persistenceAdapter>14 <memoryPersistenceAdapter/>15 </persistenceAdapter>16 17 <transportConnectors>18 <transportConnector uri="tcp://localhost:62002"/>19 </transportConnectors>20 </broker>21 22 </beans>
This example uses multicast discovery:
1 <?xml version="1.0" encoding="UTF-8"?> 2 3 <beans xmlns="http://activemq.org/config/1.0"> 4 5 <broker name="sender" persistent="false" useJmx="false"> 6 <networkConnectors> 7 <networkConnector uri="multicast://default"/> 8 </networkConnectors> 9 10 <persistenceAdapter>11 <memoryPersistenceAdapter/>12 </persistenceAdapter>13 14 <transportConnectors>15 <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>16 </transportConnectors>17 </broker>18 19 </beans>
Master Slave
通過部署多個broker實例,一個master和多個slave關系的broker來達到高可用性,有三種方案:
1、Master-Slave2、SharedFile System Master Slave3、JDBCMaster Slave
第一種方案由于只可以由兩個AMQ實例組件,實際應用場景并不廣泛;第三種方案支持N個AMQ實例組網,但他的性能會受限于數據庫;第二種方案同樣支持N個AMQ實例組網,基于kahadb存儲策略,亦可以部署在分布式文件系統上,應用靈活、高效且安全。
Master Slave方案當其中一個broker啟動并拿到獨占鎖時自動成為master,其他后續的broker則一直等待鎖,當master宕機釋放鎖時其他slave拿到獨占鎖則自動成為master,部署結構如下:

第二種方案的配置只需修改config文件夾下activemq.xml文件,修改消息持久化使用的方案:
1 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="D:/Platform/mq_share_file">2 ...3 <persistenceAdapter>4 <kahaDB directory="D:/Platform/mq_share_file/kahadb" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/>5 </persistenceAdapter>6 ...7 </broker>
消息生產者代碼:
1 public class P2PSender { 2 PRivate static final String QUEUE = "client1-to-client2"; 3 4 public static void main(String[] args) { 5 // ConnectionFactory :連接工廠,JMS用它創建連接 6 ConnectionFactory connectionFactory; 7 // Connection :JMS客戶端到JMS Provider的連接 8 Connection connection = null; 9 // session:一個發送或接收消息的線程10 Session session;11 // Destination :消息的目的地;消息發送給誰.12 Destination destination;13 // MessageProducer:消息發送者14 MessageProducer producer;15 // TextMessage message;16 // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現17 connectionFactory = new ActiveMQConnectionFactory(18 "failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)");19 try {20 // 構造從工廠得到連接對象21 connection = connectionFactory.createConnection();22 // 啟動23 connection.start();24 // 獲取操作連接25 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);26 destination = session.createQueue(QUEUE);27 // 獲取session,FirstQueue是一個服務器的queue destination = session.createQueue("FirstQueue");28 // 得到消息生成者【發送者】29 producer = session.createProducer(destination);30 // 設置不持久化31 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);32 // 構造消息33 sendMessage(session, producer);34 // session.commit();35 connection.close();36 } catch (Exception e) {37 e.printStackTrace();38 } finally {39 if (null != connection) {40 try {41 connection.close();42 } catch (JMSException e) {43 e.printStackTrace();44 }45 }46 }47 }48 49 public static void sendMessage(Session session, MessageProducer producer) throws Exception {50 for (int i = 1; i <= 1; i++) {51 Date d = new Date();52 TextMessage message = session.createTextMessage("ActiveMQ發送消息" + i + " " + new Date());53 System.out.println("發送消息:ActiveMQ發送的消息" + i + " " + new Date());54 producer.send(message);55 }56 }57 }消息消費者代碼:
1 public class P2PReceiver { 2 private static final String QUEUE = "client1-to-client2"; 3 4 public static void main(String[] args) { 5 // ConnectionFactory :連接工廠,JMS用它創建連接 6 ConnectionFactory connectionFactory; 7 // Connection :JMS客戶端到JMS Provider的連接 8 Connection connection = null; 9 // Session:一個發送或接收消息的線程10 Session session;11 // Destination :消息的目的地;消息發送給誰.12 Destination destination;13 // 消費者,消息接收者14 MessageConsumer consumer;15 connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)");16 try {17 // 得到連接對象18 connection = connectionFactory.createConnection();19 // 啟動20 connection.start();21 // 獲取操作連接22 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);23 // 創建Queue24 destination = session.createQueue(QUEUE);25 consumer = session.createConsumer(destination);26 while (true) {27 TextMessage message = (TextMessage) consumer.receive();28 if (null != message) {29 System.out.println("收到消息" + message.getText());30 }31 }32 } catch (Exception e) {33 e.printStackTrace();34 } finally {35 try {36 if (null != connection)37 connection.close();38 } catch (Throwable ignore) {39 }40 }41 }42 }新聞熱點
疑難解答