SpringBoot整合RabbitMQ(消息中间件)

2020-05-14  本文已影响0人  索性流年

消息中间件概述

消息队列中间件是分布式架构中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性框架目前使用较多的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RockerMQ。

RabbitMQ 作用

生产者发送消息不会向传统方式直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列在将消息以推送或者拉取方式给消费者进行消费,这和我们之前学习Nginx有点类似。

交换机的作用

根据具体的路由策略分发到不同的队列中。

交换机的四种类型

1.Direct exchange (直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的

2.Fanout exchange (扇型交换机)将消息路由给绑定到它身上的所有队列

3.Topic exchange ( 主题交换机)队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路
由给一个或多个绑定队列

4.Headers exchange (头交换机)类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。
通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

VirtualHost

像MySQL服务器中可以添加多个数据库一样,可以指定用户对指定库表等操作权限,RabbitMQ中这种管理权限就是VirtualHost,每个VirtualHost相当于一个独立的服务器,每个VirtualHost之前相互隔离message、queue不能互通,目的是为了解耦合

RabbitMQ消息队列的类型

1.点对点模式: 一对一模式一个生产者投递消息给队列,只能允许有一个消费者进行消费。如若消费者集群,会进行均摊消费

2.工作模式:又称公平消费模式,采用能者多劳的原则,哪个消费者应答的快,哪个就能多消费消息,当消费者没有应答之前,队列将不会再发送新的消息给消费者

3.发布订阅模式:一个生产者发送消息,多个消费者获取同样的消息,包括一个生产者,一个交换机,多个队列,多个消费者。

4.路由模式(RoutingKey):

5.通配符模式(topic):生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)
"*"表示匹配一个词语,"#"表示匹配多个词语

自动应答

不在乎消费者对这个消息处理是否成功,都会告诉队列删除该消息。如果处理消息失败情况下,实现自动补偿。

手动应答

消费处理完业务逻辑,手动返回ack(通知),告诉队列服务器是否删除消息

pom导入依赖


<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml中配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: suoxingliunian
    password: a1234560
    virtual-host: /admin_host

消息生产者

创建.class配置类

/*发布订阅模式配置交换机类型为Fanou*/
@Configuration
public class FanoutConfig {
    //邮件队列
    private String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue";
    //短信队列
    private String FANOUT_SMS_QUEUE = " fanout_sms_queue" ;
    //交换机名称
    private String EXCHANGE_NAME = "fanoutExchange";


//  定义邮件队列
    @Bean
    public Queue fanoutEamilQueue(){
        return new Queue(FANOUT_EMAIL_QUEUE);
    }

    //    定义短信队列
    @Bean
    public Queue fanoutSmsQueue(){
        return new Queue(FANOUT_SMS_QUEUE);
    }

    //    定义交换机名称 Fanout类型  其他类型例如TopicExchange
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(EXCHANGE_NAME);
    }

    //    邮件队列和交换机进行绑定(参数名称一定要和队列方法、交换机名称一致)
    @Bean
    Binding bindingExchangeEamil(Queue fanoutEamilQueue,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutEamilQueue).to(fanoutExchange);
    }

    //    短息队列和交换机进行绑定(参数名称一定要和队列方法、交换机名称一致)
    @Bean
    Binding bindingExchangeSms(Queue fanoutSmsQueue,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutSmsQueue).to(fanoutExchange);
    }
}

创建.class消息发布消息


@Component
public class FanoutProducer {
    @Autowired
    public AmqpTemplate amqpTemplate;
    public void send(String queueName) {
        String msg = "sendmsg"+new Date();
//        发送消息
        amqpTemplate.convertAndSend(queueName,msg);
    }
}


创建Controller模拟消息发布接口


@RestController
public class ProducerController {
    @Autowired
    private FanoutProducer producer;

    @GetMapping("/sendMsg")
    public void sendMsg(String queueName) {
        producer.send(queueName);
    }
}

消息消费者

工厂方法


/*工厂方法*/
public class Producer {
    private static final String queueName = "sanshengsanshishilitaohua";

    public static void main(String[] args) throws IOException, TimeoutException {
//        获取新连接
        Connection connection = RabbitMQUtils.newConntction();
//        创建通道
        Channel channel = connection.createChannel();
//        创建队列
        channel.queueDeclare(queueName, false, false, false, null);
//        发送消息
        channel.basicPublish("", queueName, null, "suoxingliunian".getBytes());
//        关闭资源
        channel.close();
        connection.close();
    }
}

工具类

public class RabbitMQUtils {

    //    创建RabbitMQ连接
    public static Connection newConntction() throws IOException, TimeoutException {
        //        创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //        设置连接地址
        connectionFactory.setHost("127.0.0.1");
        //        设置连接用户名
        connectionFactory.setUsername("admin");
        //        设置密码
        connectionFactory.setPassword("123456");
        //        设置端口号
        connectionFactory.setPort(5672);
        //        设置VirtualHost地址
        connectionFactory.setVirtualHost("/admin_host");
        Connection connection = connectionFactory.newConnection();
        return connection;
    }

}

消费者

/*消息消费者*/
public class Consumer {
    private static final String queueName = "sanshengsanshishilitaohua";

    public static void main(String[] args) throws IOException, TimeoutException {
//        获取新连接
        Connection connection = RabbitMQUtils.newConntction();
//        创建通道
         Channel channel = connection.createChannel();
//        消费者关联队列
//        channel.queueDeclare(queueName, false, false, false, null);
//        消费者获取消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
//            监听获取消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String result = new String(body,"utf-8");
                System.err.println("消费者获取生产者发送消息"+result);
                //手动应答
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
//        设置应答模式
        channel.basicConsume(queueName,true,defaultConsumer);
    }
}
上一篇 下一篇

猜你喜欢

热点阅读