第十六篇:ActiveMQ与Spring整合
2018-07-13 本文已影响19人
__y
1.Spring和ActiveMQ整合
第一步:在taotao-manager-services中引入jar包
image.png
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
第二步:配置ActiveMQ整合spring
image.png
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.208.40:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="item-change-topic" />
</bean>
</beans>
2.代码测试
@Test
public void testQueueProducer() {
//1.读取配置文件
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//2.从容器中获得JMSTemplate对象
JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
//3.从容器中获得一个Destination对象
Queue queue = (Queue)applicationContext.getBean("queueDestination");
//4.使用JMSTemplate对象发送消息,需要知道Destination
jmsTemplate.send(queue,new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage("spring activemq test");
return textMessage;
}
});
}
接收的时候我们在taotao-search-service中接收
第一步:把Activemq相关的jar包添加到工程中(同上)
第二步:创建一个MessageListener的实现类。
**
* 接收ActiveMq消息
*/
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
//取消息内容
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
第三步:配置spring和Activemq整合
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.208.40:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="item-change-topic" />
</bean>
<!-- 接收消息 -->
<!-- 配置监听器 -->
<bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" />
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>
如何测试?我们已经在配置文件中配置了监听器相关的bean,直接加载就好了
@Test
public void testQueueConsumer() throws Exception {
//初始化spring容器
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//等待
System.in.read();
}
3.添加商品同步到索引库
3.1 生产者
这里我们用的topic一对多的消息传送形式,由于添加商品涉及到同步缓存,同步索引库,添加静态页面等操作。也就是一个消息会有多个消费者。下面是配置
image.png
image.png
然后我们找到添加商品的方法,在添加完商品后,发送消息,这里需要考虑一个问题,那就是消息的内容应该是什么?既然是添加商品,消费者肯定是要知道添加的商品是哪个商品,同时本着简单的原则,我们只需要传新增商品的ID即可,如下图所示
image.png
完整的代码如下
@Service
public class ItemServiceImpl implements ItemService {
@Autowired
private TbItemMapper tbItemMapper;
@Autowired
private TbItemDescMapper itemDescMapper;
@Autowired
private JmsTemplate jmsTemplate;
@Resource(name = "itemAddTopic")
private Destination itemAddTopic;
@Override
public TbItem getItemById(long itemId) {
TbItem item = tbItemMapper.selectByPrimaryKey(itemId);
return item;
}
@Override
public EasyUIDataGridResult getItemList(int page, int rows) {
//1.在执行查询之前配置分页条件。使用PageHelper的静态方法
PageHelper.startPage(page,rows);
//2.执行查询
TbItemExample tbItemExample = new TbItemExample();
List<TbItem> list = tbItemMapper.selectByExample(tbItemExample);
//3.创建PageInfo对象
PageInfo<TbItem> pageInfo = new PageInfo<>(list);
EasyUIDataGridResult result = new EasyUIDataGridResult();
//设置数目
result.setTotal(pageInfo.getTotal());
//设置返回的数据
result.setRows(list);
return result;
}
@Override
public TaotaoResult addItem(TbItem item, String desc) {
final long id = IDUtils.genItemId();
item.setId(id);
item.setCreated(new Date());
item.setUpdated(new Date());
item.setStatus((byte) 1);
tbItemMapper.insert(item);
TbItemDesc itemDesc = new TbItemDesc();
itemDesc.setItemDesc(desc);
itemDesc.setCreated(new Date());
itemDesc.setUpdated(new Date());
itemDescMapper.insert(itemDesc);
//使用ActiveMq发送消息
jmsTemplate.send(itemAddTopic,new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(id + "");
return textMessage;
}
});
return TaotaoResult.ok();
}
}
3.2 消费者
我们的思路是,消费者传一个商品的id过来,然后通过id查询到商品的所需要的信息,再添加到索引库中去;所以我们要在Mapper中增加一个通过id查询商品的方法
image.png
image.png
完整代码
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.taotao.search.mapper.SearchItemMapper">
<select id="getItemList" resultType="com.taotao.common.pojo.SearchItem">
SELECT
a.id,
a.title,
a.sell_point,
a.price,
a.image,
b. NAME category_name,
c.item_desc
FROM
tb_item a
LEFT JOIN tb_item_cat b ON a.cid = b.id
LEFT JOIN tb_item_desc c ON a.id = c.item_id
WHERE
a.`status` = 1
</select>
<select id="getItemById" parameterType="long" resultType="com.taotao.common.pojo.SearchItem">
SELECT
a.id,
a.title,
a.sell_point,
a.price,
a.image,
b. NAME category_name,
c.item_desc
FROM
tb_item a
LEFT JOIN tb_item_cat b ON a.cid = b.id
LEFT JOIN tb_item_desc c ON a.id = c.item_id
WHERE
a.`status` = 1
AND
a.id = #{itemId}
</select>
</mapper>
image.png
package com.taotao.search.listener;
import com.taotao.search.service.SearchItemService;
import com.taotao.search.service.SearchService;
import org.springframework.beans.factory.annotation.Autowired;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class ItemAddMessageListener implements MessageListener {
@Autowired
private SearchItemService searchItemService;
@Override
public void onMessage(Message message) {
try{
//从消息中获取商品的id
TextMessage textMessage = (TextMessage)message;
String text = textMessage.getText();
Long itemId = Long.parseLong(text);
//根据商品ID查询数据,添加商品到索引库,因为事务提交需要一段时间,为了避免查询不到商品的情况出现
//所以需要设置一下等待的时间
//等待事务的提交
Thread.sleep(1000);
//查询商品,并将商品添加到索引库
searchItemService.addDocument(itemId);
}catch(Exception e) {
e.printStackTrace();
}
}
}
在SearchItmService中添加一个方法
image.png
完整代码
package com.taotao.search.service.impl;
import com.taotao.common.pojo.SearchItem;
import com.taotao.common.pojo.TaotaoResult;
import com.taotao.search.mapper.SearchItemMapper;
import com.taotao.search.service.SearchItemService;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.common.SolrInputDocument;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class SearchItemServiceImpl implements SearchItemService {
//注入SolrServer在bean中装配
@Autowired
private SolrServer solrServer;
@Autowired
private SearchItemMapper searchItemMapper;
@Override
public TaotaoResult addSerchItem() throws Exception {
//查询所有数据
List<SearchItem> searchItemList = searchItemMapper.getItemList();
//遍历商品数据,添加到索引库
for (SearchItem searchItem:
searchItemList ) {
//为每个商品创建文档对象SolrInputDocument
SolrInputDocument document = new SolrInputDocument();
//对文档对象添加域
document.addField("id", searchItem.getId());
document.addField("item_title", searchItem.getTitle());
document.addField("item_sell_point", searchItem.getSell_point());
document.addField("item_price", searchItem.getPrice());
document.addField("item_image", searchItem.getImage());
document.addField("item_category_name", searchItem.getCategory_name());
document.addField("item_desc", searchItem.getItem_desc());
//向索引库中添加文档
solrServer.add(document);
}
//提交修改
solrServer.commit();
//返回结果
return TaotaoResult.ok();
}
public TaotaoResult addDocument(Long itemId) throws Exception {
// 1、根据商品id查询商品信息。
SearchItem searchItem = searchItemMapper.getItemById(itemId);
// 2、创建一SolrInputDocument对象。
SolrInputDocument document = new SolrInputDocument();
// 3、使用SolrServer对象写入索引库。
document.addField("id", searchItem.getId());
document.addField("item_title", searchItem.getTitle());
document.addField("item_sell_point", searchItem.getSell_point());
document.addField("item_price", searchItem.getPrice());
document.addField("item_image", searchItem.getImage());
document.addField("item_category_name", searchItem.getCategory_name());
document.addField("item_desc", searchItem.getItem_desc());
// 5、向索引库中添加文档。
solrServer.add(document);
solrServer.commit();
// 4、返回成功,返回TaotaoResult。
return TaotaoResult.ok();
}
}
为什么我们要等待一秒呢。因为有可能商品添加的事务还没有完成的时候就把消息传递过来,并且消费者马上消息了,这样的话会造成一个问题就是查询不到商品的信息.所以我们一般会设置一个等待时间
Spring中配置监听
image.png完整配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.208.40:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="item-change-topic" />
</bean>
<!-- 接收消息 -->
<!-- 配置监听器 -->
<bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" />
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
<!-- 配置监听器 -->
<bean id="itemAddMessageListener" class="com.taotao.search.listener.ItemAddMessageListener"/>
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="itemAddTopic"/>
<property name="messageListener" ref="itemAddMessageListener"/>
</bean>
</beans>
4.测试结果
启动所有服务,然后添加商品
activeMq测试结果.png
然后再前台搜索少儿可以看到效果
activeMq2.png