RabbitMq

2020-06-06  本文已影响0人  笔记本一号

中间件大比拼

主流的中间有ActiveMq、kafka、rocketMq、rabbitMq

总结
在性能要求上kafka是最好的,在数据的传输的可靠性上rabbitMq是最好的,在性能和数据传输可靠性上择中的话就是rocketMq比较适合

AMQP简述

AMQP用于提供统一消息模式服务,它是应用层的高级消息队列模式,也是消息中间件设计规范(jms也是消息中间件设计规范)的一种,rabbitma就是遵循AMQP的规范

AMQP模型结构
image.png
AMQP核心概念

rabbitmq

概述

rabbitmq是Erlang语言编写,它是开源的基于AMQP协议的消息代理和消息队列服务器,rabbitmq具有高性能、高可用、高可靠的特性,具有消息投递模式丰富的特点,rabbitmq用于使完全不同的应用之间能够共享数据互相通信。由于rabbitmq是使用Erlang编写,Erlang广泛应用于交换机领域,Erlang语言在数据交互和数据同步方面性能优秀与原生Socket一样有着出色的延迟,使得rabbitmq在数据交互上具备了天然的优势

rabbitMq消息流转过程

客户端(生产者)将预先指定消息投递到哪个VitaulHost中的路由(exchange)上,同时客户端也会指定routingKey,exchange通过消息的routingKey决定将消息投递到哪个binding的queue中,监听了相应的queue的客户端(消费者)可以从queue中获取相应的消息

image.png

rabbitMq收发消息的简单代码演示

//properties文件配置
rabbitmq.id=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtualHost=testhost

将rabbitMq注册的spring容器

@Configuration
@PropertySource("classpath:/myApplication.properties")
@ComponentScan(value = "com.example.rabbitmq.demo")
public class RabbitMqConfig {
    @Value("${rabbitmq.id}")
    private String ip;
    @Value("${rabbitmq.port}")
    private int port;
    @Value("${rabbitmq.username}")
    private String username;
    @Value("${rabbitmq.password}")
    private String password;
  /*  @Value("${rabbitmq.virtualHost}")
    private String virtualHost;*/
    @Bean
    public Connection connectionFactory(){
        try {
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost(ip);
            connectionFactory.setPort(port);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

消息调用

@RestController
public class Test {
    @Autowired
    private Connection connection;
    private Channel channel=null;
    @PostConstruct
    public void newChannel() throws Exception{
         this.channel = this.connection.createChannel();
    }
    @PreDestroy
    public void destroyChannel() throws Exception{
         this.channel.close();
         this.connection.close();
    }
    @GetMapping("/consummer")
    public String consummer(){
        if (this.connection!=null){
            try {
                Channel channel = this.channel
                DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        long deliveryTag = envelope.getDeliveryTag();
                        //消息id
                        System.out.println("消息id:  " + deliveryTag);
                        //交换机
                        System.out.println("交换机:  " + envelope.getExchange());
                        //路由key
                        System.out.println("路由key: " + envelope.getRoutingKey());
                        //接受到的消息
                        System.out.println("收到的消息:  " + new String(body, "utf-8"));
                        System.out.println("---------------------------------");
                        /*channel.basicAck(deliveryTag, false);*/
                    }
                };
                String exchangeName="myexchange";
                String exchangeType="direct";
                String queueName="rutingkey";
                String routingKey="rutingkey";
                channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
       //创建队列("rutingkey","持久化","独占队列","脱离exchange是否被自动删除","拓展参数")
                channel.queueDeclare(queueName,true,false,false,null);
 //设置exchange和queue的绑定关系,并且声明队列的routingKey
                channel.queueBind(queueName, exchangeName, routingKey);
                channel.basicConsume(queueName, true, defaultConsumer);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "不是空的";
        }else {
            return "是空的";
        }
    }

    @GetMapping("/procuder")
    public String procuder(){
        if (this.connection!=null){
            try {
                Channel channel = this.channel;
                String exchangeName="myexchange";
                String routingKey="rutingkey";
                String s="helloword"+UUID.randomUUID().toString();
                   //发布消息时使用rutingkey规则上
                channel.basicPublish(exchangeName,routingKey,null,s.getBytes());
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "不是空的";
        }else {
            return "是空的";
        }
    }

}

confirm模式

当生产者发送消息的同时监听一个事件,如果消费者消费成功则监听到相应的ack

生产者
public class Producer {
    public static void main(String[] args) throws Exception {   
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPassword("guest");
        connectionFactory.setUsername("guest");
        
        //2 获取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();

        //4 指定我们的消息投递模式: 消息的确认模式 
        channel.confirmSelect();
        
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save";
        //5 发送一条消息
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        //6 添加一个确认监听
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------no ack!-----------");
            }
            
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------ack!-----------");
            }
        });

    }
}
消费者
public class Consumer {
    public static void main(String[] args) throws Exception {
        //1创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPassword("guest");
        connectionFactory.setUsername("guest");
        
        //2 获取C onnection
        Connection connection = connectionFactory.newConnection();
        
        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.#";
        String queueName = "test_confirm_queue";
        
        //4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //5 创建消费者 
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);
        
        while(true){
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            
            System.err.println("消费端: " + msg);
        }
        
        
    }
}

限流

如果消费端的消费速度跟不上生产端的消息生产速度,那么很有可能会造成,消息大量堆积,每次都会有大量的消息一次打到消费端,给消费端造成很大的压力

设置手工签收,并且消费成功后向broker发送ack,才会让消费者继续消费消息,不然消息就会排队等待消费
basicQos的参数:

生产者
public class Producer {
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_qos_exchange";
        String routingKey = "qos.save";
        
        String msg = "Hello RabbitMQ QOS Message";
        
        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }
        
    }
}

消费者
public class Consumer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        
        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //参数1:消费消息的大小,0表示不做限制
        //参数2:消费者一次最多消费消息的数量
                //参数3:限流策略应用的级别,true:Channel级别;false:consumer级别
        channel.basicQos(0, 1, false);

        //参数2 限流方式  第一件事就是 autoAck设置为 false,也就是手工签收消息
                //参数3:消费端自定义监听
        channel.basicConsume(queueName, false, new MyConsumer(channel));        
    }
}
消费端自定义监听
public class MyConsumer extends DefaultConsumer {
    private Channel channel ;
    
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
        //消费成功则发送ACK到broker中
        channel.basicAck(envelope.getDeliveryTag(), false); 
    }
}

不可路由消息的处理

image.png
生产者
public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_return_exchange";
        String routingKey = "return.save";
        String routingKeyError = "abc.save";
        
        String msg = "Hello RabbitMQ Return Message";
        
        
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                    String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                
                System.err.println("---------handle  return----------");
                System.err.println("replyCode: " + replyCode);
                System.err.println("replyText: " + replyText);
                System.err.println("exchange: " + exchange);
                System.err.println("routingKey: " + routingKey);
                System.err.println("properties: " + properties);
                System.err.println("body: " + new String(body));
            }
        });
        //参数3,true:监听器会接收到不可路由消息,然后我们可以对消息进行后续处理:存库、重发或者记录日志等。false:broker端会自动删除不可路由消息,监听器是监听不到的
        channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
        //channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
    }
}
消费者
public class Consumer {

    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_return_exchange";
        String routingKey = "return.#";
        String queueName = "test_return_queue";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        channel.basicConsume(queueName, true, new MyConsumer(channel));
        
    }
}


public class MyConsumer extends DefaultConsumer {


    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }


}

ACK与重回队列

Nack:失败应答,重回队列的尾端
Ack:成功应答

生产者
public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_ack_exchange";
        String routingKey = "ack.save";

        for(int i =0; i<5; i ++){
            //自定义头信息
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num", i);
//RabbitMQ发送消息附带BasicProperties属性详解
//BasicPropertie属性字段详解
// contentType:消息的内容类型,如:text/plain
// contentEncoding:消息内容编码
// headers:设置消息的header,类型为Map<String,Object>
// deliveryMode:1(nopersistent)非持久化,2(persistent)持久化
//  priority:消息的优先级
// correlationId:关联ID
//replyTo:用于指定回复的队列的名称
//  expiration:消息的失效时间
//  messageId:消息ID
//  timestamp:消息的时间戳
//  type:类型
// userId:用户ID
//  appId:应用程序ID
// custerId:集群ID
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            String msg = "Hello RabbitMQ ACK Message " + i;
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }
        
    }
}
消费者
public class Consumer {
    public static void main(String[] args) throws Exception {
            
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "ack.#";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        // 手工签收 必须要关闭 autoAck = false
        channel.basicConsume(queueName, false, new MyConsumer(channel));    
    }
}
public class MyConsumer extends DefaultConsumer {
    private Channel channel ;
    
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("body: " + new String(body));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if((Integer)properties.getHeaders().get("num") == 0) {
 // 参数2:设置为true 批量消息处理,设置为false单条消息处理
 // 参数3:设置为true 会把消费失败的消息从新添加到队列的尾端,设置为false不会重新回到队
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        } else {
            channel.basicAck(envelope.getDeliveryTag(), false);
        }   
    }
}

DLX:死信队列

image.png

消息变成死信有以下几种情况 :

死信队列的设置 :
生产者

public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_dlx_exchange";
        String routingKey = "dlx.save";
        
        String msg = "Hello RabbitMQ DLX Message";
        
        for(int i =0; i<1; i ++){
            
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .expiration("10000")
                    .build();
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }
        
    }
}

消费者
public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        // 这就是一个普通的交换机 和 队列 以及路由
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.#";
        String queueName = "test_dlx_queue";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        
        Map<String, Object> agruments = new HashMap<String, Object>();
        agruments.put("x-dead-letter-exchange", "dlx.exchange");
        //这个agruments属性,要设置到声明队列上
        channel.queueDeclare(queueName, true, false, false, agruments);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //要进行死信队列的声明:
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare("dlx.queue", true, false, false, null);
        channel.queueBind("dlx.queue", "dlx.exchange", "#");
        
        channel.basicConsume(queueName, true, new MyConsumer(channel));
        
        
    }
}
public class MyConsumer extends DefaultConsumer {


    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }


}
image.png
exchange:接收消息,并根据路由键转发消息所绑定的队列

exchange的属性

exchange的类型:

100%投递方案:

step1:将数据落地,先将业务数据存库,同时将消息记录也存库
step2:将消息投递到Mq服务器
step3:设置消息的响应状态,例如0:发送中,1:成功,2:失败,消费者等待服务端回应。
step4:消费者获取回应状态,并且将状态存库,更新库中消息的状态
step5:定时任务将超过固定时间内状态为未发送的消息抽取
step6:重新发送消息
step7:重发超过一定次数的消息,对其进行失败的标记,然后人工发送或者其他的方式进行消息的补偿


image.png
上一篇下一篇

猜你喜欢

热点阅读