最近的項目中用到了mq,之前自己一直在碼農一樣的照葫蘆畫瓢。最近幾天研究了下,把自己所有看下來的文檔和了解總結一下。
一. 認識JMS1.概述對于JMS,百度百科,是這樣介紹的:JMS即Java消息服務(Java Message Service)應用程序接口是一個Java平臺中關于面向消息中間件(MOM)的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。
簡短來說,JMS是一種與廠商無關的 API,用來訪問消息收發系統消息。它類似于JDBC(Java Database Connectivity),提供了應用程序之間異步通信的功能。
JMS1.0是jsr 194里規定的規范(關于jsr規范,請點擊)。目前最新的規范是JSR 343,JMS2.0。
好了,說了這么多,其實只是在說,JMS只是sun公司為了統一廠商的接口規范,而定義出的一組api接口。
2. JMS體系結構描述如下:
啟動后,activeMQ會占用兩個端口,一個是負責接收發送消息的tcp端口:61616,一個是基于web負責用戶界面化管理的端口:8161。這兩個端口可以在conf下面的xml中找到。http服務器使用了jettry。這里有個問題是啟動mq后,很長時間管理界面才可以顯示出來。
2. 用Java訪問ActiveMQ先附上Bean代碼:
public class MqBean implements Serializable{PRivate Integer age;private String name;public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public String getName() {return name;}public void setName(String name) {this.name = name;}}2.1 隊列消息的發送:public static void main(String[] args) {ConnectionFactory connectionFactory;Connection connection;Session session;Destination destination;MessageProducer producer;connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");try {connection = connectionFactory.createConnection();connection.start();//第一個參數是是否是事務型消息,設置為true,第二個參數無效//第二個參數是//Session.AUTO_ACKNOWLEDGE為自動確認,客戶端發送和接收消息不需要做額外的工作。異常也會確認消息,應該是在執行之前確認的//Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到消息后,必須調用javax.jms.Message的acknowledge方法。jms服務器才會刪除消息。可以在失敗的//時候不確認消息,不確認的話不會移出隊列,一直存在,下次啟動繼續接受。接收消息的連接不斷開,其他的消費者也不會接受(正常情況下隊列模式不存在其他消費者)//DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;而且允許重復確認。在需要考慮資源使用時,這種模式非常有效。//待測試session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);destination = session.createQueue("test-queue");producer = session.createProducer(destination);producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//優先級不能影響先進先出。。。那這個用處究竟是什么呢呢呢呢MqBean bean = new MqBean();bean.setAge(13);for(int i=0;i<100;i++){bean.setName("小黃"+i);producer.send(session.createObjectMessage(bean));}producer.close();System.out.println("呵呵");} catch (JMSException e) {e.printStackTrace();}}注:在上面的代碼中,確認模式有三種,里面的DUPS_OK_ACKNOWLEDGE和AUTO_ACKNOWLEDGE一直沒明白有什么區別。因為無法測試。不過大概也明白了一些。其實主要是MQ處理消息的流程決定的:
這些步驟是連續的,所以任何步驟都可能成為消息從生成方客戶端到使用方客戶端的傳送過程的瓶頸。這些步驟中的大多數都取決于消息傳送系統的物理特征:網絡帶寬、計算機處理速度和消息服務器體系結構等等。但是,有一些步驟還取決于消息傳送應用程序的特征和該應用程序要求的可靠性級別。其實就是基于可靠性還是性能的選擇.
2.2 隊列消息的接收:public static void main(String[] args) {ConnectionFactory connectionFactory;// Connection :JMS 客戶端到JMS Provider 的連接 Connection connection = null;// Session: 一個發送或接收消息的線程 Session session;// Destination :消息的目的地;消息發送給誰. Destination destination;// 消費者,消息接收者 MessageConsumer consumer;connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");try {// 構造從工廠得到連接對象 connection = connectionFactory.createConnection();// 啟動 connection.start();// 獲取操作連接 //這個最好還是有事務session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置 destination = session.createQueue("test-queue");consumer = session.createConsumer(destination);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {MqBean bean = (MqBean) ((ObjectMessage)message).getObject();System.out.println(bean);if (null != message) {System.out.println("收到消息" + bean.getName());}} catch (Exception e) {// TODO: handle exception}}});} catch (Exception e) {e.printStackTrace();}}注:對于隊列來說,比較簡單的優化策略,應該就是隊列分載了。由于每個消費者都是單線程的,所以可以設置多個消費者來提高速度。大家可以復制個消費者自己測試下,在消費者中添加sleep測試下效果。
2.3 訂閱消息的發送public static void main(String[] args) {ConnectionFactory connectionFactory;Connection connection;Session session;Destination destination;MessageProducer producer;connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");try {connection = connectionFactory.createConnection();connection.start();session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);destination = session.createTopic("test-topic");producer = session.createProducer(destination);producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//優先級不能影響先進先出。。。那這個用處究竟是什么呢呢呢呢MqBean bean = new MqBean();bean.setAge(13);for(int i=0;i<100;i++){Thread.sleep(1000);bean.setName("小黃"+i);producer.send(session.createObjectMessage(bean));}producer.close();System.out.println("呵呵");} catch (Exception e) {e.printStackTrace();}}2.4 訂閱消息的接收public static void main(String[] args) {ConnectionFactory connectionFactory;// Connection :JMS 客戶端到JMS Provider 的連接 Connection connection = null;// Session: 一個發送或接收消息的線程 Session session;// Destination :消息的目的地;消息發送給誰. Destination destination;// 消費者,消息接收者 MessageConsumer consumer;connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");try {// 構造從工廠得到連接對象 connection = connectionFactory.createConnection();// 啟動 connection.start();// 獲取操作連接 //這個最好還是有事務session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置 destination = session.createQueue("test-queue");consumer = session.createConsumer(destination);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {MqBean bean = (MqBean) ((ObjectMessage)message).getObject();System.out.println(bean);if (null != message) {System.out.println("收到消息" + bean.getName());}} catch (Exception e) {// TODO: handle exception}}});} catch (Exception e) {e.printStackTrace();}}以上的消息發送后,如果沒有接收到,可以登錄自己的MQ管理頁面:http://192.168.3.159:8161/admin/ ,默認帳號密碼都是admin,查看隊列中的消息

Number Of Pending Messages 等待消費的消息 這個是當前未出隊列的數量。可以理解為總接收數-總出隊列數Messages Enqueued 進入隊列的消息 進入隊列的總數量,包括出隊列的。 這個數量只增不減Messages Dequeued 出了隊列的消息 可以理解為是消費這消費掉的數量
新聞熱點
疑難解答