spring整合rabbitMq之topic模式
说明:此模式实在路由key模式的基础上,使用了通配符来管理消费者接收消息。生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;
符号#:匹配一个或者多个词cn.# 可以匹配cn.新闻或者cn.新闻.国内新闻
符号*:只能匹配一个词 cn.* 可以匹配cn.新闻或者us.新闻
1、maven依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.5.RELEASE</version>
</dependency>
2、spring-rabbit 编写
2.1、生产者:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 设置连接工厂,配置基本参数 -->
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"
virtual-host="/test"></rabbit:connection-factory>
<!--
fanout-exchange | direct-exchange | topic-exchange
声明一个名为topicExchange的topic交换机,如果这个交换机不存在,则自动创建
-->
<rabbit:topic-exchange name="topicExchange" auto-declare="true">
</rabbit:topic-exchange>
<!-- Spring为我们封装了RabbitTemplate对象来简化生产者发送数据的过程,对常用的方法进行了封装。 -->
<rabbit:template id="template" connection-factory="connectionFactory" exchange="topicExchange"></rabbit:template>
<!--在生产者中配置template对象,用于发送数据-->
<bean id="newsProducer" class="com.liming.rabbit.producer.NewsProducer">
<property name="rabbitTemplate" ref="template"/>
</bean>
<!-- RabbitAdmin对象用于创建、绑定、管理队列与交换机 -->
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>
</beans>
package com.liming.rabbit.producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.liming.rabbit.entity.News;
import java.util.Date;
public class NewsProducer {
private RabbitTemplate rabbitTemplate=null;
public RabbitTemplate getRabbitTemplate() {
return rabbitTemplate;
}
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendNews(String routingKey,News news){
//convertAndSend用于向exchange发送数据
//第一个参数是路由键 第二个参数是发送的对象
rabbitTemplate.convertAndSend(routingKey,news);
System.out.println("新闻发送成功");
}
public static void main(String[] args) {
ApplicationContext applicationContext=new ClassPathXmlApplicationContext("applicatonContext.xml");
NewsProducer n=(NewsProducer)applicationContext.getBean("newsProducer");
n.sendNews("us.20190101",new News("新华社","国际新闻内容:xxxx",new Date(),"国际新闻"));
n.sendNews("cn.20190101",new News("凤凰tv","国内新闻内容:yyyy",new Date(),"国内新闻"));
}
}
2.2、消费者
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"
virtual-host="/test"></rabbit:connection-factory>
<rabbit:admin connection-factory="connectionFactory"></rabbit:admin>
<!--创建队列-->
<rabbit:queue name="topicQueue" auto-declare="true" auto-delete="false" durable="false" exclusive="false"/>
<!--交换机与队列绑定,并指明筛选条件-->
<rabbit:topic-exchange name="topicExchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="topicQueue" pattern="us.*"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--启动消费者后,Spring底层自动监听对应的topicQueue数据,一旦有新的消息进来,自动传入到consumer Bean的recv的News参数中,
之后再程序对News进一步处理-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="consumer" method="recv" queue-names="topicQueue"/>
</rabbit:listener-container>
<bean id="consumer" class="com.liming.rabbit.consumer.NewsConsumer"></bean>
</beans>
package com.liming.rabbit.consumer;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.liming.rabbit.entity.News;
public class NewsConsumer {
public void recv(News news){
System.out.println("接收到最新新闻:"+news.toString());
}
public static void main(String[] args) {
ApplicationContext applicationContext=new ClassPathXmlApplicationContext("applicatonContext.xml");
}
}
2.3、发送消息的实体类
package com.liming.rabbit.entity;
import java.io.Serializable;
import java.util.Date;
public class News implements Serializable {
private String source;
private String title;
private Date createTime;
private String content;
public News(String source, String title, Date createTime, String content) {
this.source = source;
this.title = title;
this.createTime = createTime;
this.content = content;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
@Override
public String toString() {
return "News [source=" + source + ", title=" + title + ", createTime=" + createTime + ", content=" + content
+ "]";
}
}
3、运行效果