Java框架搭建

第十六篇:ActiveMQ与Spring整合

2018-07-13  本文已影响19人  __y

1.Spring和ActiveMQ整合

第一步:在taotao-manager-services中引入jar包


image.png
<dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
        </dependency>

第二步:配置ActiveMQ整合spring


image.png
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
       xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">


    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.208.40:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!-- 配置生产者 -->
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="item-change-topic" />
    </bean>

</beans>

2.代码测试

 @Test
    public void testQueueProducer() {
        //1.读取配置文件
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        //2.从容器中获得JMSTemplate对象
        JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
        //3.从容器中获得一个Destination对象
        Queue queue = (Queue)applicationContext.getBean("queueDestination");
        //4.使用JMSTemplate对象发送消息,需要知道Destination
        jmsTemplate.send(queue,new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage("spring activemq test");
                return textMessage;
            }
        });

    }

接收的时候我们在taotao-search-service中接收
第一步:把Activemq相关的jar包添加到工程中(同上)
第二步:创建一个MessageListener的实现类。

**
 * 接收ActiveMq消息
 */
public class MyMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            TextMessage textMessage = (TextMessage) message;
            //取消息内容
            String text = textMessage.getText();
            System.out.println(text);
        } catch (JMSException e) {
            e.printStackTrace();

        }
    }
}

第三步:配置spring和Activemq整合

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
       xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.208.40:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="item-change-topic" />
    </bean>
    <!-- 接收消息 -->
    <!-- 配置监听器 -->
    <bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" />
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>
</beans>

如何测试?我们已经在配置文件中配置了监听器相关的bean,直接加载就好了

 @Test
    public void testQueueConsumer() throws Exception {
        //初始化spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        //等待
        System.in.read();

    }

3.添加商品同步到索引库

3.1 生产者

这里我们用的topic一对多的消息传送形式,由于添加商品涉及到同步缓存,同步索引库,添加静态页面等操作。也就是一个消息会有多个消费者。下面是配置


image.png
image.png

然后我们找到添加商品的方法,在添加完商品后,发送消息,这里需要考虑一个问题,那就是消息的内容应该是什么?既然是添加商品,消费者肯定是要知道添加的商品是哪个商品,同时本着简单的原则,我们只需要传新增商品的ID即可,如下图所示


image.png
完整的代码如下
@Service
public class ItemServiceImpl implements ItemService {
    @Autowired
    private TbItemMapper tbItemMapper;
    @Autowired
    private TbItemDescMapper itemDescMapper;
    @Autowired
    private JmsTemplate jmsTemplate;
    @Resource(name = "itemAddTopic")
    private Destination itemAddTopic;
    @Override
    public TbItem getItemById(long itemId) {
        TbItem item = tbItemMapper.selectByPrimaryKey(itemId);
        return item;
    }

    @Override
    public EasyUIDataGridResult getItemList(int page, int rows) {
        //1.在执行查询之前配置分页条件。使用PageHelper的静态方法
        PageHelper.startPage(page,rows);
        //2.执行查询
        TbItemExample tbItemExample = new TbItemExample();
        List<TbItem> list = tbItemMapper.selectByExample(tbItemExample);
        //3.创建PageInfo对象
        PageInfo<TbItem> pageInfo = new PageInfo<>(list);
        EasyUIDataGridResult result = new EasyUIDataGridResult();
        //设置数目
        result.setTotal(pageInfo.getTotal());
        //设置返回的数据
        result.setRows(list);
        return result;
    }

    @Override
    public TaotaoResult addItem(TbItem item, String desc) {
        final long id = IDUtils.genItemId();
        item.setId(id);
        item.setCreated(new Date());
        item.setUpdated(new Date());
        item.setStatus((byte) 1);
        tbItemMapper.insert(item);
        TbItemDesc itemDesc = new TbItemDesc();
        itemDesc.setItemDesc(desc);
        itemDesc.setCreated(new Date());
        itemDesc.setUpdated(new Date());
        itemDescMapper.insert(itemDesc);
        //使用ActiveMq发送消息
        jmsTemplate.send(itemAddTopic,new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(id + "");
                return textMessage;
            }
        });
        return TaotaoResult.ok();
    }


}

3.2 消费者

我们的思路是,消费者传一个商品的id过来,然后通过id查询到商品的所需要的信息,再添加到索引库中去;所以我们要在Mapper中增加一个通过id查询商品的方法


image.png
image.png

完整代码

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.taotao.search.mapper.SearchItemMapper">

    <select id="getItemList" resultType="com.taotao.common.pojo.SearchItem">
        SELECT
        a.id,
        a.title,
        a.sell_point,
        a.price,
        a.image,
        b. NAME category_name,
        c.item_desc
        FROM
        tb_item a
        LEFT JOIN tb_item_cat b ON a.cid = b.id
        LEFT JOIN tb_item_desc c ON a.id = c.item_id
        WHERE
        a.`status` = 1
    </select>
    <select id="getItemById" parameterType="long" resultType="com.taotao.common.pojo.SearchItem">
        SELECT
        a.id,
        a.title,
        a.sell_point,
        a.price,
        a.image,
        b. NAME category_name,
        c.item_desc
        FROM
        tb_item a
        LEFT JOIN tb_item_cat b ON a.cid = b.id
        LEFT JOIN tb_item_desc c ON a.id = c.item_id
        WHERE
        a.`status` = 1
        AND
        a.id = #{itemId}
    </select>

</mapper>
image.png
package com.taotao.search.listener;

import com.taotao.search.service.SearchItemService;
import com.taotao.search.service.SearchService;
import org.springframework.beans.factory.annotation.Autowired;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ItemAddMessageListener implements MessageListener {
    @Autowired
    private SearchItemService searchItemService;
    @Override
    public void onMessage(Message message) {
        try{
            //从消息中获取商品的id
            TextMessage textMessage = (TextMessage)message;
            String text = textMessage.getText();
            Long itemId = Long.parseLong(text);
            //根据商品ID查询数据,添加商品到索引库,因为事务提交需要一段时间,为了避免查询不到商品的情况出现
            //所以需要设置一下等待的时间
            //等待事务的提交
            Thread.sleep(1000);
            //查询商品,并将商品添加到索引库
            searchItemService.addDocument(itemId);
        }catch(Exception e) {
            e.printStackTrace();
        }

    }
}

在SearchItmService中添加一个方法


image.png

完整代码

package com.taotao.search.service.impl;

import com.taotao.common.pojo.SearchItem;
import com.taotao.common.pojo.TaotaoResult;
import com.taotao.search.mapper.SearchItemMapper;
import com.taotao.search.service.SearchItemService;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.common.SolrInputDocument;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;
@Service
public class SearchItemServiceImpl implements SearchItemService {
    //注入SolrServer在bean中装配
    @Autowired
    private SolrServer solrServer;
    @Autowired
    private SearchItemMapper searchItemMapper;
    @Override
    public TaotaoResult addSerchItem() throws Exception {
        //查询所有数据
        List<SearchItem> searchItemList = searchItemMapper.getItemList();
        //遍历商品数据,添加到索引库
        for (SearchItem searchItem:
            searchItemList ) {
            //为每个商品创建文档对象SolrInputDocument
            SolrInputDocument document = new SolrInputDocument();
            //对文档对象添加域
            document.addField("id", searchItem.getId());
            document.addField("item_title", searchItem.getTitle());
            document.addField("item_sell_point", searchItem.getSell_point());
            document.addField("item_price", searchItem.getPrice());
            document.addField("item_image", searchItem.getImage());
            document.addField("item_category_name", searchItem.getCategory_name());
            document.addField("item_desc", searchItem.getItem_desc());
            //向索引库中添加文档
            solrServer.add(document);
        }
        //提交修改
        solrServer.commit();
        //返回结果
        return TaotaoResult.ok();
    }

    public TaotaoResult addDocument(Long itemId) throws Exception {
        // 1、根据商品id查询商品信息。
        SearchItem searchItem = searchItemMapper.getItemById(itemId);
        // 2、创建一SolrInputDocument对象。
        SolrInputDocument document = new SolrInputDocument();
        // 3、使用SolrServer对象写入索引库。
        document.addField("id", searchItem.getId());
        document.addField("item_title", searchItem.getTitle());
        document.addField("item_sell_point", searchItem.getSell_point());
        document.addField("item_price", searchItem.getPrice());
        document.addField("item_image", searchItem.getImage());
        document.addField("item_category_name", searchItem.getCategory_name());
        document.addField("item_desc", searchItem.getItem_desc());
        // 5、向索引库中添加文档。
        solrServer.add(document);
        solrServer.commit();
        // 4、返回成功,返回TaotaoResult。
        return TaotaoResult.ok();
    }


}

为什么我们要等待一秒呢。因为有可能商品添加的事务还没有完成的时候就把消息传递过来,并且消费者马上消息了,这样的话会造成一个问题就是查询不到商品的信息.所以我们一般会设置一个等待时间

Spring中配置监听

image.png

完整配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
       xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.208.40:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="item-change-topic" />
    </bean>
    <!-- 接收消息 -->
    <!-- 配置监听器 -->
    <bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" />
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>

    <!-- 配置监听器 -->
    <bean id="itemAddMessageListener" class="com.taotao.search.listener.ItemAddMessageListener"/>
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="itemAddTopic"/>
        <property name="messageListener" ref="itemAddMessageListener"/>
    </bean>

</beans>

4.测试结果

启动所有服务,然后添加商品


activeMq测试结果.png

然后再前台搜索少儿可以看到效果


activeMq2.png
上一篇下一篇

猜你喜欢

热点阅读