前一篇文章我們整體上學(xué)習(xí)了JMS,這篇文章我們來(lái)寫寫小demo實(shí)踐一下
在寫之前我們要下載安裝ActiveMQ服務(wù),下載地址當(dāng)然可以去官網(wǎng)下載,但我下載下來(lái)的有linux 和win兩個(gè)版本但是win的只有32位的,所以這里給一個(gè)win32、64的下載地址
下載ActiveMQ5.9
ActiveMQ安裝很簡(jiǎn)單,下載解壓后到bin目錄就有win32 和win64兩個(gè)目錄按照自己的系統(tǒng)進(jìn)入后就有activemq.bat來(lái)啟動(dòng)ActiveMQ服務(wù)
一、點(diǎn)對(duì)點(diǎn)消息模型實(shí)例
使用queue作為目的之
1、消息發(fā)送端
package mqtest1;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.JMSException;import javax.jms.MessagePRoducer;import javax.jms.Queue;import javax.jms.session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Producer { public static void main(String[] args) { int i =0; //鏈接工廠 ActiveMQConnectionFactory connectionFactory = null; //鏈接對(duì)象 Connection connection = null; //會(huì)話 Session session = null; //隊(duì)列(目的地、生產(chǎn)者發(fā)送消息的目的地) Queue queue = null; //消息生產(chǎn)者 MessageProducer producer = null; connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.120:61616"); try { connection = connectionFactory.createConnection(); connection.start(); //第一個(gè)參數(shù)是否開啟事務(wù) true開啟 ,false不開啟事務(wù),如果開啟記得手動(dòng)提交 //參數(shù)二,表示的是簽收模式,一般使用的有自動(dòng)簽收和客戶端自己確認(rèn)簽收 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); queue = session.createQueue("test_queue"); //為隊(duì)列創(chuàng)建消息生產(chǎn)者 producer = session.createProducer(queue); //消息是否為持久性的,這個(gè)不設(shè)置也是可以的,默認(rèn)是持久的 //producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息設(shè)置為持久的發(fā)送后及時(shí)服務(wù)關(guān)閉了再次開啟消息也不會(huì)丟失。 //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //發(fā)送后如果服務(wù)關(guān)閉再次開啟則消息會(huì)丟失。 while (true){ //創(chuàng)建消息 TextMessage message = session.createTextMessage(); message.setText("測(cè)試隊(duì)列消息"+i); //發(fā)送消息到目的地 producer.send(message); i++; if(i>10) { break; } } session.commit(); System.out.println("呵呵消息發(fā)送結(jié)束"); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { //釋放資源 //producer.close(); //session.close(); //connection.close(); } }}2、消息消費(fèi)端package mqtest1;import javax.jms.Connection;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.Queue;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Receive { public static void main(String[] args) { // 鏈接工廠 ActiveMQConnectionFactory connectionFactory = null; // 鏈接對(duì)象 Connection connection = null; // 會(huì)話 Session session = null; // 隊(duì)列(目的地,消費(fèi)者消費(fèi)消息的地方) Queue queue = null; // 消息消費(fèi)者 MessageConsumer consumer = null; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.1.120:61616"); try { connection = connectionFactory.createConnection(); connection.start(); // 創(chuàng)建session是的true 和false session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); queue = session.createQueue("test_queue"); // 隊(duì)列(目的地,消費(fèi)者消費(fèi)消息的地方) consumer = session.createConsumer(queue); // 消息消費(fèi)者 // Message message = consumer.receive(); //同步方式接收 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { String value = textMessage.getText(); System.out.println("value: " + value); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}點(diǎn)對(duì)點(diǎn)模型Destination作為目的地1、消息發(fā)送端
package mq;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Queue;import javax.jms.Session;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnectionFactory;public class TestMQ { public static void main(String[] args) { int i =0; //鏈接工廠 ConnectionFactory connectionFactory = null; // 鏈接對(duì)象 Connection connection = null; // 會(huì)話對(duì)象 Session session = null; // 目的地 Destination destination = null; // 消息生產(chǎn)者 MessageProducer producer = null; connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.120:61616"); try { connection = connectionFactory.createConnection(); connection.start(); //第一個(gè)參數(shù)是否開啟事務(wù) true開啟 ,false不開啟事務(wù),如果開啟記得手動(dòng)提交 //參數(shù)二,表示的是簽收模式,一般使用的有自動(dòng)簽收和客戶端自己確認(rèn)簽收 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("test-queue"); //為目的地創(chuàng)建消息生產(chǎn)者 producer = session.createProducer(destination); //消息是否為持久性的,這個(gè)不設(shè)置也是可以的,默認(rèn)是持久的 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); while(true) { TestBean tbean = new TestBean(); tbean.setAge(25); tbean.setName("hellojava" +i); producer.send(session.createObjectMessage(tbean)); i++; if( i>10) { break; } } System.out.println("呵呵消息已發(fā)送"); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { try { producer.close(); session.close(); connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }}2、消息消費(fèi)端package mq;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.ObjectMessage;import javax.jms.Queue;import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory;public class AcceptMq {public static void main(String[] args) { ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的連接 Connection connection = null; // Session: 一個(gè)發(fā)送或接收消息的線程 Session session = null; // Destination :消息的目的地;消息發(fā)送給誰(shuí). Destination destination = null; // 消費(fèi)者,消息接收者 //MessageConsumer consumer = null; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.1.120:61616"); try { //通過(guò)工廠創(chuàng)建鏈接 connection = connectionFactory.createConnection(); //啟動(dòng)鏈接 connection.start(); //創(chuàng)建會(huì)話 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //消息目的地 destination = session.createQueue("test-queue"); //消息消費(fèi)者 MessageConsumer consumer = session.createConsumer(destination); //同步方式接受信息,如果還沒(méi)有獲取到則會(huì)阻塞直到接收到信息 /*Message messages = consumer.receive(); TestBean value =(TestBean)((ObjectMessage)messages).getObject(); String name = value.getName();*/ consumer.setMessageListener(new MessageListener(){ @Override public void onMessage(Message message){ try { TestBean tbean =(TestBean)((ObjectMessage)message).getObject(); System.out.println("tbean: "+tbean); if(null != message) { System.out.println("收到信息1: "+tbean.getName()); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}3、bean 類package mq;import java.io.Serializable;public class TestBean implements Serializable{private int age;private String name;public TestBean() {};public TestBean(int age, String name) { this.age = age; this.name = name;}public int getAge() { return age;}public void setAge(int age) { this.age = age;}public String getName() { return name;}public void setName(String name) { this.name = name;}}二、發(fā)布/訂閱消息模型實(shí)例
1、消息發(fā)布端
package mq;import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnectionFactory;public class PSMQ { public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.101:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //創(chuàng)建話題 Topic topic = session.createTopic("myTopic.messages"); //為話題創(chuàng)建消息生產(chǎn)者 MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); while(true) { TextMessage message = session.createTextMessage(); message.setText("message_" + System.currentTimeMillis()); producer.send(message); System.out.println("Sent message: " + message.getText()); } } }2、消息訂閱端package mq;import javax.jms.Connection;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import javax.jms.MessageListener;import org.apache.activemq.ActiveMQConnectionFactory;public class PSAccept { public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.101:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //創(chuàng)建話題 Topic topic = session.createTopic("myTopic.messages"); //為話題創(chuàng)建消費(fèi)者 MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message: " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }ok 三個(gè)demo很簡(jiǎn)單,在項(xiàng)目中導(dǎo)入jms相關(guān)的jar包,代碼復(fù)制過(guò)去就能跑了,在看看第一篇文章就大體上能明白各個(gè)類是干什么用的。點(diǎn)對(duì)點(diǎn)消息模型和發(fā)布/訂閱消息模型兩種方式其實(shí)不同的就是使用隊(duì)列、還是使用話題創(chuàng)建目的地不同其他的都一樣。
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.120:61616");其中第一個(gè)admin是用戶名第二個(gè)是密碼而第三個(gè)參數(shù)就是協(xié)議+ip+port(端口),這幾個(gè)參數(shù)兩個(gè)客戶端都是一樣的不然消費(fèi)端就獲取不到了……
說(shuō)到密碼我們順便來(lái)看看ActiveMQ訪問(wèn)密碼的設(shè)置
三、ActiveMQ訪問(wèn)密碼設(shè)置
在ActiveMQ的conf目錄的activemq.xml中添加賬號(hào)密碼
<plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="whd" passWord="123" groups="users,admins"/> </users> </simpleAuthenticationPlugin> </plugins> activemq.xml中添加位置:
ok這樣我們對(duì)這個(gè)ActiveMQ設(shè)置了一個(gè)用戶名密碼,所以在創(chuàng)建鏈接的時(shí)候要修改admin這個(gè)默認(rèn)的用戶名密碼為修改后的用戶名密碼。
connectionFactory = new ActiveMQConnectionFactory("whd", "123","tcp://192.168.0.104:61616");這樣我們就能正常的向服務(wù)器發(fā)送消息而消費(fèi)端也能從服務(wù)商消費(fèi)消息了……差點(diǎn)忘了,還有一個(gè)ActiveMQ管理頁(yè)面地址:http://127.0.0.1:8161/admin/ 訪問(wèn)這個(gè)地址登陸管理頁(yè)面,默認(rèn)用戶名密碼都是admin
好了先就小高興一下吧,之后接著來(lái)深入學(xué)習(xí)……
|
新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注