Spring相关

SpringBoot 使用(四): 消息队列使用

2016-01-13  本文已影响22887人  龙白一梦

一. 概述

在整套的微服务架构中, 消息队列是不可或缺的部分, 它能够起到线程内同步或者异步调用无法达到的作用,优缺点分别是:
优点:

  1. 解耦
    i. 只依赖消息的格式, 而不依赖发送者的ip和端口
    ii. 多消费者的情况下, 发送者不需要关注消费者的任何信息
  2. 路由
    不能互相访问的网络之间可以消息队列实现访问, 可以减少对现有网络的修改。
  3. 消息可靠性
    当消费者发生故障时, 消息可以被有效保存下来, 等待恢复后继续访问.
  4. 异步调用
    发送者异步发送消息, 不等待消息ack,不会对发送者本身产生响应速度的影响, 当然异步调用也是可以实现的。
  5. 方便扩展
    集群部署消息队列, 当流量增大和减小是可以通过调整部署来实现和发送方, 消费方无关。

缺点:
多出一个环节,需要保证消息队列的可用性。

二. 常用消息队列

目前常用的消息队列大概有三种类型,RabbitMQ等AMQP系列, Kafka, Redis等kev value系列,它们的使用场景分别是:
1.RabbitMQ: 相对重量级高并发的情况,比如数据的异步处理 任务的串行执行等.
2.Kafka: 基于Pull的模式来处理,具体很高的吞吐量,一般用来进行 日志的存储和收集.
3.Redis: 轻量级高并发,实时性要求高的情况,比如缓存,秒杀,及时的数据分析(ELK日志分析框架,使用的就是Redis).

三. SpingBoot 集成消息队列

在SpingBoot中对这个三种都有支持

  1. Redis
    请参照 其他作者的 文章 http://www.jianshu.com/p/a2ab17707eff

  2. RabbitMQ
    i. 配置

     <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
     </dependency>
    

    ii.实现方式 RabbitMQDemoConfigration.java

import org.springframework.amqp.core.;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/
*

  1. Kafka 使用
    i. pom.xml 配置
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.10</artifactId>
     </dependency>
     <dependency>
       <groupId>org.springframework.integration</groupId>
       <artifactId>spring-integration-kafka</artifactId>
     </dependency>
    

ii. spring-integration-kafka.xml 也可以研究通过bean方式来实现

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <int:channel id="inputToKafka">
        <int:queue/>
    </int:channel>

    <int-kafka:outbound-channel-adapter
            id="kafkaOutboundChannelAdapter"
            kafka-producer-context-ref="kafkaProducerContext"
            channel="inputToKafka">
        <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
    </int-kafka:outbound-channel-adapter>

    <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="50000"/>

    <int-kafka:producer-context id="kafkaProducerContext">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration broker-list="192.168.2.2:9092"
                                              key-class-type="java.lang.String"
                                              value-class-type="java.lang.String"
                                              topic="1_service"
                                              value-encoder="kafkaEncoder"
                                              key-encoder="kafkaEncoder"/>
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>

    <bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.common.StringEncoder"/>
</beans>

iii. 客户端实现代码:

  @Autowired
    MessageChannel             inputToKafka;

    String value = "Hello Kafka";
    Message<String> mess = MessageBuilder.withPayload(value)
            .setHeader(KafkaHeaders.TOPIC, appSetting.getStrValue("topic")).build();
    inputToKafka.send(mess);

四. 后记

消息中间件有很多,实际使用中往往是根据架构师的技术栈相关,做到了解使用场景和基本原理,在项目中提升自己细节能力,做到个人和公司共同成长,才是好的方式.

上一篇 下一篇

猜你喜欢

热点阅读