国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 學(xué)院 > 開發(fā)設(shè)計(jì) > 正文

JMS學(xué)習(xí)二(簡(jiǎn)單的ActiveMq實(shí)例)

2019-11-08 18:23:17
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

前一篇文章我們整體上學(xué)習(xí)了JMS,這篇文章我們來(lái)寫寫小demo實(shí)踐一下

在寫之前我們要下載安裝ActiveMQ服務(wù),下載地址當(dāng)然可以去官網(wǎng)下載,但我下載下來(lái)的有linux 和win兩個(gè)版本但是win的只有32位的,所以這里給一個(gè)win32、64的下載地址

下載ActiveMQ5.9

ActiveMQ安裝很簡(jiǎn)單,下載解壓后到bin目錄就有win32 和win64兩個(gè)目錄按照自己的系統(tǒng)進(jìn)入后就有activemq.bat來(lái)啟動(dòng)ActiveMQ服務(wù)

一、點(diǎn)對(duì)點(diǎn)消息模型實(shí)例

使用queue作為目的之

1、消息發(fā)送端

package mqtest1;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.JMSException;import javax.jms.MessagePRoducer;import javax.jms.Queue;import javax.jms.session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Producer {	public static void main(String[] args) {		int i =0;		//鏈接工廠		ActiveMQConnectionFactory connectionFactory = null;		//鏈接對(duì)象		Connection connection = null;		//會(huì)話		Session session = null;		//隊(duì)列(目的地、生產(chǎn)者發(fā)送消息的目的地)		Queue  queue = null;		//消息生產(chǎn)者		MessageProducer producer = null;		connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.120:61616");		try {			connection = connectionFactory.createConnection();			connection.start();			//第一個(gè)參數(shù)是否開啟事務(wù) true開啟 ,false不開啟事務(wù),如果開啟記得手動(dòng)提交			//參數(shù)二,表示的是簽收模式,一般使用的有自動(dòng)簽收和客戶端自己確認(rèn)簽收			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);		    queue  = session.createQueue("test_queue");			//為隊(duì)列創(chuàng)建消息生產(chǎn)者		     producer =  session.createProducer(queue);			//消息是否為持久性的,這個(gè)不設(shè)置也是可以的,默認(rèn)是持久的			//producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息設(shè)置為持久的發(fā)送后及時(shí)服務(wù)關(guān)閉了再次開啟消息也不會(huì)丟失。			//producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //發(fā)送后如果服務(wù)關(guān)閉再次開啟則消息會(huì)丟失。			while (true){				//創(chuàng)建消息				TextMessage message = session.createTextMessage();				message.setText("測(cè)試隊(duì)列消息"+i);				//發(fā)送消息到目的地				producer.send(message);				i++;				if(i>10) {					break;				}			}			session.commit();			System.out.println("呵呵消息發(fā)送結(jié)束");		} catch (JMSException e) {			// TODO Auto-generated catch block			e.printStackTrace();		} finally  {			//釋放資源				//producer.close();				//session.close();				//connection.close();		}	}}2、消息消費(fèi)端

package mqtest1;import javax.jms.Connection;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.Queue;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Receive {	public static void main(String[] args) {		// 鏈接工廠		ActiveMQConnectionFactory connectionFactory = null;		// 鏈接對(duì)象		Connection connection = null;		// 會(huì)話		Session session = null;		// 隊(duì)列(目的地,消費(fèi)者消費(fèi)消息的地方)		Queue queue = null;		// 消息消費(fèi)者		MessageConsumer consumer = null;		connectionFactory = new ActiveMQConnectionFactory("admin", "admin",				"tcp://192.168.1.120:61616");		try {			connection = connectionFactory.createConnection();			connection.start();			// 創(chuàng)建session是的true 和false			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);			queue = session.createQueue("test_queue"); // 隊(duì)列(目的地,消費(fèi)者消費(fèi)消息的地方)			consumer = session.createConsumer(queue); // 消息消費(fèi)者			// Message message = consumer.receive(); //同步方式接收			consumer.setMessageListener(new MessageListener() {				@Override				public void onMessage(Message message) {					TextMessage textMessage = (TextMessage) message;					try {						String value = textMessage.getText();						System.out.println("value: " + value);					} catch (JMSException e) {						// TODO Auto-generated catch block						e.printStackTrace();					}				}			});		} catch (JMSException e) {			// TODO Auto-generated catch block			e.printStackTrace();		}	}}點(diǎn)對(duì)點(diǎn)模型Destination作為目的地

1、消息發(fā)送端

package mq;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Queue;import javax.jms.Session;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnectionFactory;public class TestMQ {	public static void main(String[] args) {		int i =0;		//鏈接工廠		ConnectionFactory connectionFactory = null;		// 鏈接對(duì)象		Connection connection = null;		// 會(huì)話對(duì)象		Session session = null;		// 目的地		Destination destination = null;		// 消息生產(chǎn)者		MessageProducer producer = null;		connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.120:61616");		try {			connection = connectionFactory.createConnection();			connection.start();			//第一個(gè)參數(shù)是否開啟事務(wù) true開啟 ,false不開啟事務(wù),如果開啟記得手動(dòng)提交			//參數(shù)二,表示的是簽收模式,一般使用的有自動(dòng)簽收和客戶端自己確認(rèn)簽收			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);			destination = session.createQueue("test-queue");			//為目的地創(chuàng)建消息生產(chǎn)者			producer = session.createProducer(destination);			//消息是否為持久性的,這個(gè)不設(shè)置也是可以的,默認(rèn)是持久的			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);			while(true) {				TestBean tbean = new TestBean();				tbean.setAge(25);				tbean.setName("hellojava" +i);				producer.send(session.createObjectMessage(tbean));				i++;				if( i>10) {					break;				}			}			System.out.println("呵呵消息已發(fā)送");		} catch (JMSException e) {			// TODO Auto-generated catch block			e.printStackTrace();		} finally {			try {				producer.close();				session.close();				connection.close();			} catch (JMSException e) {				// TODO Auto-generated catch block				e.printStackTrace();			}		}	}}2、消息消費(fèi)端

package mq;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.ObjectMessage;import javax.jms.Queue;import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory;public class AcceptMq {public static void main(String[] args) {	ConnectionFactory connectionFactory;	// Connection :JMS 客戶端到JMS Provider 的連接  	Connection connection = null;	// Session: 一個(gè)發(fā)送或接收消息的線程  	Session session = null;	// Destination :消息的目的地;消息發(fā)送給誰(shuí).  	Destination destination = null;	// 消費(fèi)者,消息接收者  	//MessageConsumer consumer = null;	connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.1.120:61616");	try {		//通過(guò)工廠創(chuàng)建鏈接		connection = connectionFactory.createConnection();		//啟動(dòng)鏈接		connection.start();		//創(chuàng)建會(huì)話		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);		//消息目的地		destination = session.createQueue("test-queue");		//消息消費(fèi)者		MessageConsumer consumer = session.createConsumer(destination);		//同步方式接受信息,如果還沒(méi)有獲取到則會(huì)阻塞直到接收到信息		/*Message messages = consumer.receive();		TestBean  value  =(TestBean)((ObjectMessage)messages).getObject();                 String name = value.getName();*/		consumer.setMessageListener(new MessageListener(){			@Override			public void onMessage(Message message){				try {				TestBean  tbean =(TestBean)((ObjectMessage)message).getObject();				System.out.println("tbean: "+tbean);				if(null != message) {					 System.out.println("收到信息1: "+tbean.getName());				}				} catch (JMSException e) {					// TODO Auto-generated catch block					e.printStackTrace();				}			}		});	} catch (JMSException e) {		// TODO Auto-generated catch block		e.printStackTrace();	} }}3、bean 類

package mq;import java.io.Serializable;public class TestBean implements Serializable{private int age;private String name;public TestBean() {};public TestBean(int age, String name) {	this.age = age;	this.name = name;}public int getAge() {	return age;}public void setAge(int age) {	this.age = age;}public String getName() {	return name;}public void setName(String name) {	this.name = name;}}

二、發(fā)布/訂閱消息模型實(shí)例

1、消息發(fā)布端

package mq;import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnectionFactory;public class PSMQ {    public static void main(String[] args) throws JMSException {          ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.101:61616");          Connection connection = factory.createConnection();          connection.start();          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);          //創(chuàng)建話題        Topic topic = session.createTopic("myTopic.messages");          //為話題創(chuàng)建消息生產(chǎn)者        MessageProducer producer = session.createProducer(topic);          producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);          while(true) {              TextMessage message = session.createTextMessage();              message.setText("message_" + System.currentTimeMillis());              producer.send(message);              System.out.println("Sent message: " + message.getText());          }      }  }2、消息訂閱端

package mq;import javax.jms.Connection;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import javax.jms.MessageListener;import org.apache.activemq.ActiveMQConnectionFactory;public class PSAccept {	 public static void main(String[] args) throws JMSException {  	        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.101:61616");  	        Connection connection = factory.createConnection();  	        connection.start();  	        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  	        //創(chuàng)建話題	        Topic topic = session.createTopic("myTopic.messages"); 	        //為話題創(chuàng)建消費(fèi)者	        MessageConsumer consumer = session.createConsumer(topic); 	        consumer.setMessageListener(new MessageListener() { 				@Override				public void onMessage(Message message) {	                TextMessage tm = (TextMessage) message;  	                try {  	                    System.out.println("Received message: " + tm.getText());  	                } catch (JMSException e) {  	                    e.printStackTrace();  	                }  				}  	        });  	    } }ok 三個(gè)demo很簡(jiǎn)單,在項(xiàng)目中導(dǎo)入jms相關(guān)的jar包,代碼復(fù)制過(guò)去就能跑了,在看看第一篇文章就大體上能明白各個(gè)類是干什么用的。

點(diǎn)對(duì)點(diǎn)消息模型和發(fā)布/訂閱消息模型兩種方式其實(shí)不同的就是使用隊(duì)列、還是使用話題創(chuàng)建目的地不同其他的都一樣。

connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.120:61616");

其中第一個(gè)admin是用戶名第二個(gè)是密碼而第三個(gè)參數(shù)就是協(xié)議+ip+port(端口),這幾個(gè)參數(shù)兩個(gè)客戶端都是一樣的不然消費(fèi)端就獲取不到了……

說(shuō)到密碼我們順便來(lái)看看ActiveMQ訪問(wèn)密碼的設(shè)置

三、ActiveMQ訪問(wèn)密碼設(shè)置

在ActiveMQ的conf目錄的activemq.xml中添加賬號(hào)密碼

 <plugins>              <simpleAuthenticationPlugin>                  <users>                      <authenticationUser username="whd" passWord="123" groups="users,admins"/>                  </users>              </simpleAuthenticationPlugin>          </plugins> activemq.xml中添加位置:

ok這樣我們對(duì)這個(gè)ActiveMQ設(shè)置了一個(gè)用戶名密碼,所以在創(chuàng)建鏈接的時(shí)候要修改admin這個(gè)默認(rèn)的用戶名密碼為修改后的用戶名密碼。

connectionFactory = new ActiveMQConnectionFactory("whd", "123","tcp://192.168.0.104:61616");這樣我們就能正常的向服務(wù)器發(fā)送消息而消費(fèi)端也能從服務(wù)商消費(fèi)消息了……

差點(diǎn)忘了,還有一個(gè)ActiveMQ管理頁(yè)面地址:http://127.0.0.1:8161/admin/ 訪問(wèn)這個(gè)地址登陸管理頁(yè)面,默認(rèn)用戶名密碼都是admin

好了先就小高興一下吧,之后接著來(lái)深入學(xué)習(xí)……


發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 泸州市| 新邵县| 灵台县| 绍兴市| 陆河县| 丽江市| 南靖县| 博湖县| 云南省| 洮南市| 皋兰县| 湘潭市| 家居| 石台县| 仪陇县| 老河口市| 都昌县| 峨眉山市| 朝阳县| 沭阳县| 永登县| 邹平县| 清流县| 田林县| 湘潭县| 双牌县| 平乡县| 南靖县| 武邑县| 大洼县| 扶沟县| 高邑县| 洪雅县| 上虞市| 长武县| 新晃| 阳朔县| 南涧| 平昌县| 温州市| 瑞安市|