SpringMVC配置rabbitMq
2019-10-30 本文已影响0人
菜的无法无天
添加依赖
<!-- RabbitMQ的Java Client库 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-amqp -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>
添加配置类
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
@PropertySource("classpath:rabbitmq.properties")
@Configuration
public class RabbitmqConfig {
@Autowired
Environment env;
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory(
env.getProperty("rabbitmq.host"),
env.getProperty("rabbitmq.port", Integer.class)
);
factory.setUsername(env.getProperty("rabbitmq.username"));
factory.setPassword(env.getProperty("rabbitmq.password"));
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
}
辅助对象 Persion
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Persion implements Serializable {
private String name;
private java.util.Date today;
}
消费者
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class Receiver {
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(new Queue("hello"));
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
// 设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("1 receive msg : " + JSONObject.parseObject(new String(body)));
//不读取消息并且将当前消息抛弃掉,消息队列中删除当前消息
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
//不读取消息,消息队列中保留当前消息未被查看状态
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
//确认消息成功消费,删除消息队列中的消息
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//确认消息成功消费,删除消息队列中的消息,他跟上面貌似一样
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
});
return container;
}
@Bean
public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(new Queue("hello"));
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
// 设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println(" 2 receive msg : " + JSONObject.parseObject(new String(body)));
//不读取消息并且将当前消息抛弃掉,消息队列中删除当前消息
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
//不读取消息,消息队列中保留当前消息未被查看状态
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
//确认消息成功消费,删除消息队列中的消息
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//确认消息成功消费,删除消息队列中的消息,他跟上面貌似一样
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
});
return container;
}
}
生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send() {
for (int i = 0; i <20 ; i++) {
this.rabbitTemplate.convertAndSend ("hello", new Persion("zhangsn"+i,new Date()));
}
}
}
rabbitmq.properties 配置文件
#IP地址
rabbitmq.host=39.108.181.14
#端口号
rabbitmq.port=5672
#用户名
rabbitmq.username=guest
#密码
rabbitmq.password=guestv
测试控制器
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @version : 1.0
* @program: rabbitmq
* @description: java类作用描述
* @author: 天问
* @create: 2019-01-15
* @updateUser : 修改人员
* @updateRemark : 修改内容
* @updateDate: 2019-01-15
**/
@RestController
public class TestController2 {
@Autowired
private Sender sender;
@RequestMapping("/add")
public String addGuess(){
sender.send();
return "ok";
}
}