国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

ActiveMQ的幾種集群配置

2019-11-14 23:09:42
字體:
來源:轉載
供稿:網友
ActiveMQ的幾種集群配置

ActiveMQ是一款功能強大的消息服務器,它支持許多種開發語言,例如java, C, C++, C#等等。企業級消息服務器無論對服務器穩定性還是速度,要求都很高,而ActiveMQ的分布式集群則能很好的滿足這一需求,下面說說ActiveMQ的幾種集群配置。

Queue consumer clusters

此集群讓多個消費者同時消費一個隊列,若某個消費者出問題無法消費信息,則未消費掉的消息將被發給其他正常的消費者,結構圖如下:

Broker clusters

此種配置是一個消費者連接到多個broker集群的中的一個broker,當該broker出問題時,消費者自動連接到其他一個正常的broker。消費者使用 failover:// 協議來連接broker。

failover:(tcp://localhost:61616,tcp://localhost:61617)

failover官網介紹 http://activemq.apache.org/failover-transport-reference.html

broker之間的通過靜態發現(static discovery)和動態發現(dynamic discovery)來維持彼此發現,下面來介紹靜態發現和動態發現的機制:

靜態發現:

靜態發現通過配置固定的broker uri來發現彼此,配置語法如下:

static:(uri1,uri2,uri3,...)?options

例如:

static:(tcp://localhost:61616,tcp://remotehost:61617?trace=false,vm://localbroker)?initialReconnectDelay=100

更多靜態發現介紹,見ActiveMQ官網 http://activemq.apache.org/static-transport-reference.html

動態發現:

動態發現機制是在各個broker啟動時通過Fanout transport來發現彼此,配置舉例如下:

1 <broker name="foo">2   <transportConnectors>3     <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>4   </transportConnectors>5   ...6 </broker>

更多動態發現機制介紹,見官網http://activemq.apache.org/discovery-transport-reference.html

Networks of brokers

多個broker組成集群,當其中一個broker的消費者出問題導致消息堆積無法消費掉時,通過ActiveMQ支持的Network of Broker方案可將該broker堆積的消息轉發到其他有消費者的broker。該方案主要有以下兩種配置方式:

1、為broker配置文件配置networkConnector元素

2、使用發現機制互相探測broker

Here is an example of using the fixed list of URIs:

 1 <?xml version="1.0" encoding="UTF-8"?> 2   3 <beans xmlns="http://activemq.org/config/1.0"> 4   5   <broker brokerName="receiver" persistent="false" useJmx="false">  6     <networkConnectors> 7       <!-- Static discovery --> 8       <networkConnector uri="static:(tcp://localhost:62001)"/> 9       <!-- MasterSlave Discovery -->10       <!--<networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> -->11     </networkConnectors>12  13     <persistenceAdapter>14       <memoryPersistenceAdapter/>15     </persistenceAdapter>16  17    <transportConnectors>18       <transportConnector uri="tcp://localhost:62002"/>19     </transportConnectors>20   </broker>21  22 </beans>

This example uses multicast discovery:

 1 <?xml version="1.0" encoding="UTF-8"?> 2   3 <beans xmlns="http://activemq.org/config/1.0"> 4   5   <broker name="sender" persistent="false" useJmx="false">  6     <networkConnectors> 7       <networkConnector uri="multicast://default"/> 8     </networkConnectors> 9  10     <persistenceAdapter>11       <memoryPersistenceAdapter/>12     </persistenceAdapter>13  14   <transportConnectors>15       <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>16     </transportConnectors>17   </broker>18  19 </beans>

Master Slave

通過部署多個broker實例,一個master和多個slave關系的broker來達到高可用性,有三種方案:

1、Master-Slave2、SharedFile System Master Slave3、JDBCMaster Slave

第一種方案由于只可以由兩個AMQ實例組件,實際應用場景并不廣泛;第三種方案支持N個AMQ實例組網,但他的性能會受限于數據庫;第二種方案同樣支持N個AMQ實例組網,基于kahadb存儲策略,亦可以部署在分布式文件系統上,應用靈活、高效且安全。

Master Slave方案當其中一個broker啟動并拿到獨占鎖時自動成為master,其他后續的broker則一直等待鎖,當master宕機釋放鎖時其他slave拿到獨占鎖則自動成為master,部署結構如下:

第二種方案的配置只需修改config文件夾下activemq.xml文件,修改消息持久化使用的方案:

1 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="D:/Platform/mq_share_file">2   ...3     <persistenceAdapter>4             <kahaDB directory="D:/Platform/mq_share_file/kahadb" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/>5     </persistenceAdapter>6     ...7 </broker>

消息生產者代碼:

 1 public class P2PSender { 2     PRivate static final String QUEUE = "client1-to-client2"; 3  4     public static void main(String[] args) { 5         // ConnectionFactory :連接工廠,JMS用它創建連接 6         ConnectionFactory connectionFactory; 7         // Connection :JMS客戶端到JMS Provider的連接 8         Connection connection = null; 9         // session:一個發送或接收消息的線程10         Session session;11         // Destination :消息的目的地;消息發送給誰.12         Destination destination;13         // MessageProducer:消息發送者14         MessageProducer producer;15         // TextMessage message;16         // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現17         connectionFactory = new ActiveMQConnectionFactory(18                 "failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)");19         try {20             // 構造從工廠得到連接對象21             connection = connectionFactory.createConnection();22             // 啟動23             connection.start();24             // 獲取操作連接25             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);26             destination = session.createQueue(QUEUE);27             // 獲取session,FirstQueue是一個服務器的queue destination = session.createQueue("FirstQueue");28             // 得到消息生成者【發送者】29             producer = session.createProducer(destination);30             // 設置不持久化31             producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);32             // 構造消息33             sendMessage(session, producer);34             // session.commit();35             connection.close();36         } catch (Exception e) {37             e.printStackTrace();38         } finally {39             if (null != connection) {40                 try {41                     connection.close();42                 } catch (JMSException e) {43                     e.printStackTrace();44                 }45             }46         }47     }48 49     public static void sendMessage(Session session, MessageProducer producer) throws Exception {50         for (int i = 1; i <= 1; i++) {51             Date d = new Date();52             TextMessage message = session.createTextMessage("ActiveMQ發送消息" + i + "  " + new Date());53             System.out.println("發送消息:ActiveMQ發送的消息" + i + "  " + new Date());54             producer.send(message);55         }56     }57 }

消息消費者代碼:

 1 public class P2PReceiver { 2     private static final String QUEUE = "client1-to-client2"; 3      4     public static void main(String[] args) { 5         // ConnectionFactory :連接工廠,JMS用它創建連接 6         ConnectionFactory connectionFactory; 7         // Connection :JMS客戶端到JMS Provider的連接 8         Connection connection = null; 9         // Session:一個發送或接收消息的線程10         Session session;11         // Destination :消息的目的地;消息發送給誰.12         Destination destination;13         // 消費者,消息接收者14         MessageConsumer consumer;15         connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)");16         try {17             // 得到連接對象18             connection = connectionFactory.createConnection();19             // 啟動20             connection.start();21             // 獲取操作連接22             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);23             // 創建Queue24             destination = session.createQueue(QUEUE);25             consumer = session.createConsumer(destination);26             while (true) {27                 TextMessage message = (TextMessage) consumer.receive();28                 if (null != message) {29                     System.out.println("收到消息" + message.getText());30                 }31             }32         } catch (Exception e) {33             e.printStackTrace();34         } finally {35             try {36                 if (null != connection)37                     connection.close();38             } catch (Throwable ignore) {39             }40         }41     }42 }


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 稷山县| 廉江市| 得荣县| 济南市| 垫江县| 积石山| 织金县| 西充县| 克山县| 申扎县| 兴业县| 兴宁市| 黄骅市| 巴塘县| 蓝山县| 临汾市| 开远市| 胶南市| 上林县| 化德县| 吉安市| 临沧市| 五峰| 绍兴市| 太康县| 漯河市| 淳安县| 河曲县| 潼关县| 吴桥县| 城口县| 偏关县| 德格县| 涞源县| 府谷县| 德惠市| 镇平县| 洪湖市| 响水县| 嘉荫县| 榆树市|