国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學(xué)院 > 開發(fā)設(shè)計(jì) > 正文

Java ActiveMQ 講解(二)Spring ActiveMQ整合+注解消息監(jiān)聽

2019-11-15 01:05:06
字體:
供稿:網(wǎng)友
java ActiveMQ 講解(二)SPRing ActiveMQ整合+注解消息監(jiān)聽

對(duì)于ActiveMQ消息的發(fā)送,原聲的api操作繁瑣,而且如果不進(jìn)行二次封裝,打開關(guān)閉會(huì)話以及各種創(chuàng)建操作也是夠夠的了。那么,Spring提供了一個(gè)很方便的去收發(fā)消息的框架,spring jms。整合Spring后,代碼不僅變得非常優(yōu)雅,而且易用性和擴(kuò)展性更好。

廢話不多說,直接開搞。

1. maven依賴
        <!-- activemq -->        <dependency>            <groupId>org.apache.xbean</groupId>            <artifactId>xbean-spring</artifactId>            <version>3.16</version>        </dependency>        <dependency>            <groupId>org.springframework</groupId>            <artifactId>spring-jms</artifactId>            <version>${springframework.version}</version>        </dependency>        <dependency>            <groupId>org.apache.activemq</groupId>            <artifactId>activemq-all</artifactId>            <version>${activemq.version}</version>        </dependency>

2.命名空間引入
<?xml version="1.0" encoding="UTF-8"?><!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ --><beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"    xmlns:jms="http://www.springframework.org/schema/jms"    xsi:schemaLocation="http://www.springframework.org/schema/beans           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd        http://www.springframework.org/schema/context           http://www.springframework.org/schema/context/spring-context-3.2.xsd        http://www.springframework.org/schema/jms        http://www.springframework.org/schema/jms/spring-jms-4.1.xsd        http://activemq.apache.org/schema/core        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

3. Xml配置
    <amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://${activemq.Word="${activemq.password}" />    <bean id="jmsConnectionFactoryExtend" class="org.springframework.jms.connection.CachingConnectionFactory">        <constructor-arg ref="jmsConnectionFactory" />        <property name="sessionCacheSize" value="100" />    </bean>    <!-- 消息處理器 -->    <bean id="jmsMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />    <!-- ====Producer side start==== -->    <!-- 定義JmsTemplate的Queue類型 -->    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">        <constructor-arg ref="jmsConnectionFactoryExtend" />        <!-- 非pub/sub模型(發(fā)布/訂閱),即隊(duì)列模式 -->        <property name="pubSubDomain" value="false" />        <property name="messageConverter" ref="jmsMessageConverter"></property>    </bean>    <!-- 定義JmsTemplate的Topic類型 -->    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">        <constructor-arg ref="jmsConnectionFactoryExtend" />        <!-- pub/sub模型(發(fā)布/訂閱) -->        <property name="pubSubDomain" value="true" />        <property name="messageConverter" ref="jmsMessageConverter"></property>    </bean>    <jms:listener-container destination-type="queue" container-type="default" connection-factory="jmsConnectionFactoryExtend" acknowledge="auto" concurrency="5-10">        <jms:listener destination="testqueue" ref="queueReciver" />    </jms:listener-container>    

第一個(gè)是配置我們的mq連接,ip+端口號(hào),帳號(hào)密碼的信息。

第二個(gè)是引入spring的mq連接池。可以配置緩存的連接數(shù)。

第三個(gè)是消息處理器,Spring默認(rèn)提供了基于Jdk Serializable的消息處理和MappingJackson2MessageConventer,其實(shí)這兩個(gè)挺常用,在Spring Redis中,在Spring MVC中,都有著這幾種conventer的身影。

下面是兩個(gè)發(fā)送消息的模版類,類似于之前講過的RedisTemplate。向其注入上面定義的消息處理器,代碼中我們會(huì)用到。(其實(shí)類中已經(jīng)判斷如果不進(jìn)行注入就設(shè)置一個(gè)默認(rèn)的,但是自己注入的話,方便我們控制)

listener-container是Spring提供的一個(gè)監(jiān)聽器容器,用于統(tǒng)一控制我們的監(jiān)聽類來接收處理消息。這里面有一些配置,schema有說明。可以配置響應(yīng)模式,消費(fèi)者數(shù)量等。開啟多消費(fèi)者,有助于加快隊(duì)列處理速度。

4.注解方式的實(shí)現(xiàn)

如果要用注解的方式,就不需要在xml中自己定義消息監(jiān)聽容器了。只需要加入以下的代碼:

<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">        <property name="connectionFactory" ref="jmsConnectionFactoryExtend"/>    </bean>        <!-- 監(jiān)聽注解支持 -->    <jms:annotation-driven/>

這樣,配置我們消費(fèi)處理類上的@listener注解,即可監(jiān)聽對(duì)應(yīng)的queue或者topic消息。

5.生產(chǎn)者代碼

隊(duì)列消息:

@Resource@Component("queueSender")public class QueueSender {    @Resource(name = "jmsQueueTemplate")    private JmsTemplate jmsQueueTemplate;// 通過@Qualifier修飾符來注入對(duì)應(yīng)的bean        public void send(String destination, final Object message) {        jmsQueueTemplate.send(destination, new MessageCreator() {            @Override            public Message createMessage(Session session) throws JMSException {                return jmsQueueTemplate.getMessageConverter().toMessage(message, session);            }        });    }}

訂閱消息:

@Componentpublic class TopicSender {        @Resource(name="jmsTopicTemplate")    private JmsTemplate jmsTemplate;            /**     * 發(fā)送一條消息到指定的隊(duì)列(目標(biāo))     * @param queueName 隊(duì)列名稱     * @param message 消息內(nèi)容     */    public void publish(String destination,final Object message){        jmsTemplate.send(destination, new MessageCreator() {            @Override            public Message createMessage(Session session) throws JMSException {                return jmsTemplate.getMessageConverter().toMessage(message, session);            }        });    }}

6.消費(fèi)者代碼
package cn.test.activemq.consumer.queue;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.jms.annotation.JmsListener;import org.springframework.jms.listener.adapter.MessageListenerAdapter;import org.springframework.jms.support.converter.MessageConversionException;import org.springframework.stereotype.Component;import cn.test.MqBean;import cn.test.activemq.message.types.QueueDefination;/** * @author Han */@Component("spqueueconsumertest")public class SpringQueueReciverTest extends MessageListenerAdapter{    private static final Logger log = LoggerFactory.getLogger(SpringQueueReciverTest.class);                @JmsListener(destination=QueueDefination.TEST_QUEUE,concurrency="5-10")    public void onMessagehehe(Message message, Session session) throws JMSException {        try {            MqBean bean = (MqBean) getMessageConverter().fromMessage(message);            System.out.println(bean.getName());            System.out.println(session);            message.acknowledge();            message.acknowledge();        } catch (MessageConversionException | JMSException e) {            e.printStackTrace();        }            }    }

上面的@JmsListener(destination=QueueDefination.TEST_QUEUE,concurrency="5-10")是在用注解方式監(jiān)聽的時(shí)候加入。如果用xml配置容易,可以忽略。

附上MqBean

public class MqBean implements Serializable{    private Integer age;    private String name;    public Integer getAge() {        return age;    }    public void setAge(Integer age) {        this.age = age;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    }

運(yùn)行效果截圖:

ActiveMQ的基本用法大概就這些了。后續(xù)如果有新的發(fā)現(xiàn)包括優(yōu)化發(fā)面的,再繼續(xù)發(fā)吧。


發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 贵德县| 时尚| 蛟河市| 湄潭县| 会昌县| 商城县| 确山县| 叙永县| 资溪县| 沅江市| 梁山县| 江陵县| 平原县| 蓬安县| 井冈山市| 神农架林区| 庆阳市| 襄樊市| 长宁县| 泰兴市| 西藏| 永顺县| 运城市| 信宜市| 石楼县| 昌宁县| 泊头市| 石河子市| 报价| 鸡泽县| 西乡县| 应城市| 武威市| 巍山| 德安县| 寿阳县| 东城区| 二连浩特市| 大邑县| 乌拉特中旗| 文成县|