消息队列使用与改造点

2018-03-30  本文已影响24人  东郭先生李

序言

文档背景

消息队列改造是双创框架升级工作的一部分。

文档主题

文档主要讲述消息队列代码更新后,新的使用方式和如何使用原有的消费者模式完成业务逻辑。

文档结构图

[站外图片上传中...(image-e177db-1522380525219)]

文档变更历史

作者 日期 版本 变更点
李清泉 2018-3-29 0.5 创建文档

配置文件的写法

配置数据源

配置数据源有多种方式,我使用过有效的有两种:

  1. 单独配置一个源
  2. 配置到绑定

单独配置一个源

rabbitmq:
  addresses: amqp://192.168.1.241:5672
  username: mqadmin
  password: mqadmin

单独配置的方式是使用rabbitmq作为顶层配置,然后在其他配置中引用,如绑定时使用:

binders:
    rabbit1:
        type: rabbit
        environment:
           spring:
              rabbitmq:
                addresses: ${rabbitmq.addresses}
                username: ${rabbitmq.username}
                password: ${rabbitmq.password}
                virtual-host: test1

在上述代码中引用了单独配置。这种好处是可以配置一次多处引用,避免重复写。

配置到绑定

binders:
    rabbit1:
        type: rabbit
        environment:
           spring:
              rabbitmq:
                addresses: amqp://192.168.1.241:5672
                username: mqadmin
                password: mqadmin
                virtual-host: test1

这种方式是直接写到绑定上。如果只有一个配置,可以这么写,但如果有多个绑定并且用同一个数据源,就变成了冗余。

绑定数据源

绑定数据源是将数据源配置到一个变量之中,方便配置接收或者发送时使用。上述已经说过配置方法了。在下面的配置代码中,是将一个数据源配置到变量 rabbit1之中。

binders:
    rabbit1:
        type: rabbit
        environment:
           spring:
              rabbitmq:
                addresses: amqp://192.168.1.241:5672
                username: mqadmin
                password: mqadmin
                virtual-host: test1

可以绑定多个的,如果下面的配置:

binders:
    rabbit1:
        type: rabbit
        environment:
           spring:
              rabbitmq:
                addresses: amqp://192.168.1.241:5672
                username: mqadmin
                password: mqadmin
                virtual-host: test1
    rabbit2:
        type: rabbit
        environment:
           spring:
              rabbitmq:
                addresses: amqp://192.168.1.245:5672
                username: mqadmin
                password: mqadmin
                virtual-host: test2

上面的代码配置了两个变量,rabbit1和rabbit2. 在接收和发送配置中可以引用这两个不同的变量代表不同的源。

配置接收

spring:
  cloud:
    stream:
      bindings:
        input1:
          binder: rabbit1
          contentType: text/plain
          destination: testquene

上述代码中,定义一个叫做 input1的接收,内容格式为文本,目标(队列名)为testquene ,绑定到rabbit1源变量(上述的绑定配置)。

配置发送

spring:
  cloud:
    stream:
      bindings:
        output1:
          binder: rabbit1
          destination: testquene2
          contentType: text/plain

上述代码中,配置了一个叫做output1的消息发送,目标(队列名)为testquene2,模式是文本。

完整配置示例

server:
  port: 9087

rabbitmq:
  addresses: amqp://192.168.1.241:5672
  username: mqadmin
  password: mqadmin

spring:
  cloud:
    stream:
      bindings:
        input1:
          binder: rabbit1
          #group: test.qqqq
          contentType: text/plain
          destination: mqTestDefault.test.qqqq
        input2:
          binder: rabbit1
          #group: test.qqqq
          contentType: text/plain
          destination: quene1
        output1:
          binder: rabbit1
          destination: mqTestDefault.test.qqqq
          contentType: text/plain
        output2:
          binder: rabbit1
          destination: quene1
          contentType: text/plain
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                addresses: ${rabbitmq.addresses}
                username: ${rabbitmq.username}
                password: ${rabbitmq.password}
                virtual-host: test1
  #              exchange:
      defaultBinder:

消息队列使用方式

消息队列涉及到两种操作:

下面分别说明这两个操作的使用方式

消息队列的接收处理

消息队列的接收处理有两种方式:

  1. 直接使用监听
  2. 使用适配原有的消费模式

将配置文件中的接收与发送定义到代码中

无论是接收还是发送消息。都要先在一个接口类中定义信道。接收时,定义为SubscribableChannel ;发送时,定义为MessageChannel 。示例:

public interface Sink {
    String INPUT1 = "input1";

    String INPUT2="input2";

    String OUTPUT1="output1";
    String OUTPUT2="output2";

    @Input(INPUT1)
    SubscribableChannel input1();

    @Input(INPUT2)
    SubscribableChannel input12();

    @Output(OUTPUT1)
    MessageChannel output1();
    @Output(OUTPUT2)
    MessageChannel output2();
}

上述代码中,分别定义了两个接收信道和两个发送信息。请务必注意名称一定要对应对配置文件中。比如上述的input1 input2 output1 output2 ,必须在配置文件中存在的。
上述的定义是注册到spring之中的,使用时,只需要使用相应的名称的bean即可以。如果使用input1的bean名称,即为input1的接收信道。

使用监听的方式

定义好接收与发送的spring bean后,可以在监听中使用接收了。

@Service
@EnableBinding(Sink.class)
public class StreamMessageQueneManager extends AbstractMessageQuene implements MessageQueueManager{
    // 监听 binding 为 Sink.INPUT 的消息
    @StreamListener(Sink.INPUT1)
    public void input1(Message<String> message) {
        System.out.println("第一个队列:" + message.getPayload());
        doSomething(message.getPayload());
    }
}

上述代码中,StreamMessageQueneManager 这个类为spring bean,使用了@EnableBinding(Sink.class)注解。代表将已经定义好的Sink接口中的定义好的接收在这个类中开启监听。在public void input1(Message<String> message)方法头上,加上了@StreamListener(Sink.INPUT1)注解,目的是将Sink中的INPUT1代表的接收在这个方法上开启监听。当监听收到消息时,将自动调用public void input1(Message<String> message)方法,传入message对象,我们就可以使用这个对象执行任何逻辑。

使用适配原有消费模式

原来的双创是使用生产-消费模式处理消息的,我们原来是使用MessageQueueManager接口接收和发送信息的,这个接口代码如下:

public interface MessageQueueManager {
    /**
     * 发送消息
     *  @param queueName 队列名称
     *  @param message 放入队列的内容
     */
     void sendMessage(String queueName, Object message);

    /**
     * 获取通道消息
     *  @param queueName 队列名称
     *  @return message 队列的内容
     */
     Object getMessage(String queueName);

  
}

Object getMessage(String queueName);方法中,我们通过传入队列名的方法,主动获取消息的。因此改造后为了减少代码变动,这种方式保持不变。原有的代码不需要变动即可正常执行。原理是使用了适配的方法:

@Service
@EnableBinding(Sink.class)
public class StreamMessageQueneManager  implements MessageQueueManager{
    private Map<String,Queue<String>> queueMap=new HashMap<>();
    // 监听 binding 为 Sink.INPUT 的消息
    @StreamListener(Sink.INPUT1)
    public void input1(Message<String> message) {
        System.out.println("第一个队列:" + message.getPayload());
        Queue queue = getQueue(Sink.INPUT1);
        queue.offer(message.getPayload());
    }
    private Queue getQueue(String queneName) {
        Queue queue=queueMap.get(queneName);
        if(queue==null){
            queue=new ConcurrentLinkedQueue();//非阻塞
            queueMap.put(queneName,queue);
        }
        return queue;
    }
     @Override
    public Object getMessage(String queueName) {
        Queue queue = getQueue(queueName);
        return queue.poll();
    }
    
    @Override
    public void sendMessage(String queueName, Object message) {
        MessageChannel channel=channelMap.get(queueName);
        if(channel==null){
            throw new RuntimeException(queueName+"对应的信通不存在!");
        }
        if(message!=null) {
            channel.send(MessageBuilder.withPayload(message).build());
        }
    }
}

在上述接口的实现类中,监听将input1收到的消息放入了临时非阻塞且线程安全的ConcurrentLinkedQueue中。业务逻辑通过定时主动调用public Object getMessage(String queueName),获取到目标队列并取出消息执行逻辑。值得注意的是由于ConcurrentLinkedQueue没有限制容量,如果不能及时消费掉里面存储的消息,可能会造成内存占用过多甚至溢出,因此需要考虑消费的速度和调用的间隔。

发送消息

消息的发送也有两种方式:

  • 使用 messageChannel
  • 使用原有生产消费模式接口

使用 messageChannel

相对于使用监听,发送也可以使用新的messageChannel方式。

@Service
public class TestSender  {
    @Autowired
    @Qualifier("output1")
    MessageChannel output1;
    
    public void send(){
        output1.send(MessageBuilder.withPayload("您好,这是一个测试消息").build());
    }
}

上述代码中,output1为spring bean的名称,代表了发送的信道代理。直接使用它就可以将消息发送到对应信道。

使用原有的适配

请参考上述的MessageQueueManager接口,调用public void sendMessage(String queueName, Object message) 指定队列名即可。原理是,适配代码中已将messagechannel封装起来:

@Service
@EnableBinding(Sink.class)
public class StreamMessageQueneManager  implements MessageQueueManager{
    @Autowired
    @Qualifier("output1")
    MessageChannel output1;

    @Autowired
    @Qualifier("output2")
    MessageChannel output2;
    
    private Map<String,MessageChannel> channelMap;
    
     /**
     * bean初始化后执行这个方法
     */
    @PostConstruct
    public void postConstruct(){
        if (channelMap == null) {
            channelMap = new HashMap<>();
            channelMap.put(Sink.OUTPUT1, output1);
            channelMap.put(Sink.OUTPUT2, output2);
        }
    }
    
    @Override
    public void sendMessage(String queueName, Object message) {
        MessageChannel channel=channelMap.get(queueName);
        if(channel==null){
            throw new RuntimeException(queueName+"对应的信通不存在!");
        }
        if(message!=null) {
            channel.send(MessageBuilder.withPayload(message).build());
        }
    }
    
    @Override
    public Object getMessage(String queueName) {
        Queue queue = getQueue(queueName);
        return queue.poll();
    }

}

上述代码中,channel被放入Map中,通过队列名可以取出,然后发送消息。原理本质上是适配。

改造点

原有的双创中,只有solr和微博模块有用到消息队列的功能。
我认为,至少下面的模块可以用消息队列:

消息队列应用点

消息队列主要应用到以下情境:

  • 不要求该分逻辑与主逻辑实时性
  • 执行慢的逻辑
  • 任务协调 分布式集群中,同一个实例可以通过消息队列接收的唯一实例的特性进行任务协调。

“附合要求”的其他模块是可以使用消息队列的,这需要我们在后续优化。

上一篇下一篇

猜你喜欢

热点阅读