Spring Cloud学习day107:Stream消息分区和

2019-12-14  本文已影响0人  开源oo柒

一、Spring Cloud Stream的消息分区

1.Sream解决了什么问题:

是一个用来为微服务应用构建消息驱动能力的框架。它可以基于Spring Boot 来创建独立的,可用于生产的Spring 应用程序。他通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。参考资料

示例

2.Stream的消息分组:

直观的理解就是一群消费者一起处理消息。需要注意的是:每个发送到消费组的数据,仅由消费组中的一个消费者处理。
默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,这就很可能会出现重复消费的问题,在某些场景下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能,实现的方式非常简单,我们只需要在服务消费者端设置spring.cloud.stream.bindings.{channel-name}.group属性即可。

示例
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-config-server</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
spring.application.name=config-server
server.port=9020

#设置服务注册中心地址
eureka.client.serviceUrl.defaultZone=http://admin:123456@eureka1:8761/eureka/,http://admin:123456@eureka2:8761/eureka/

#消息队列的链接配置 
spring.rabbitmq.host=192.168.226.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/

# 对应 MQ 是 exchange
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
public interface ISendeService {

    String OUTPUT = "outputProduct";
    
    @Output(OUTPUT)
    SubscribableChannel send(); 
}
public class Product implements Serializable {

    private int id;
    private String name;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public Product() {
        super();
    }
    public Product(int id, String name) {
        super();
        this.id = id;
        this.name = name;
    }
    @Override
    public String toString() {
        return "Product [id=" + id + ", name=" + name + "]";
    }
}
@EnableBinding(value = ISendeService.class)
@EnableEurekaClient
@SpringBootApplication
public class ISendeApplication {

    public static void main(String[] args) {
        SpringApplication.run(ISendeApplication.class, args);
    }
}
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Dalston.SR5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-config-server</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
spring.application.name=config-server
server.port=9021

#设置服务注册中心地址
eureka.client.serviceUrl.defaultZone=http://admin:123456@eureka1:8761/eureka/,http://admin:123456@eureka2:8761/eureka/

#消息队列的链接配置 
spring.rabbitmq.host=192.168.226.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/

# 对应 MQ 是 exchange
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列
spring.cloud.stream.bindings.inputProduct.group=groupProduct
public class Product implements Serializable {

    private int id;
    private String name;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public Product() {
        super();
    }
    public Product(int id, String name) {
        super();
        this.id = id;
        this.name = name;
    }
    @Override
    public String toString() {
        return "Product [id=" + id + ", name=" + name + "]";
    }
public interface IReceiveService {

    String INPUT = "inputProduct";
    
    @Input(INPUT)
    SubscribableChannel receiver(); 
}
@Service
@EnableBinding({IReceiveService.class})
public class ReceiverService {

    @StreamListener(IReceiveService.INPUT)
    public void onReciver(Product product) {
        //处理消息
        System.out.println("Receiver-Group:"+product.toString());
    }
}
@EnableBinding(value = IReceiveService.class)
@EnableEurekaClient
@SpringBootApplication
public class IReceiverApplication {

    public static void main(String[] args) {
        SpringApplication.run(IReceiverApplication.class, args);
    }
}

在消息的发送者项目中编写测试类。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ISendeApplication.class)
public class Test {

    @Autowired
    private ISendeService isendeService;

    @org.junit.Test
    public void testSend() {
        Product product = new Product(01, "Test Sende");
        for (int i = 0; i < 10; i++) {
            // 将消息封装成Message
            Message message = MessageBuilder.withPayload(product).build();
            isendeService.send().send(message);
        }
    }
}
示例 示例 示例

3.Stream的消息分区:

我们可能需要相同特征的消息能够总是被发送到同一个消费者上去处理,在消费组中我们可以保证消息不会被重复消费,但是在同组下有多个实例的时候,我们无法确定每次处理消息的是不是被同一消费者消费,此时我们需要借助于消息分区,消息分区之后,具有相同特征的消息就可以总是被同一个消费者处理了。

示例

设置消费组: spring.cloud.stream.bindings.<通道名>.group=<消费组名>
设置主题: spring.cloud.stream.bindings.<通道名>.destination=<主题名>
给生产者指定通道的主题:spring.cloud.stream.bindings.<通道名>.destination=<主题名>
(2)消费者开启分区,指定实例数量与实例索引:

(3)生产者指定分区键:

分区键: spring.cloud.stream.bindings.<通道名>.producer.partitionKeyExpress=<分区键>
分区数量: spring.cloud.stream.bindings.<通道名>.producer.partitionCount=<分区数量>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-config-server</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>
spring.application.name=config-server
server.port=9020

#设置服务注册中心地址
eureka.client.serviceUrl.defaultZone=http://admin:123456@eureka1:8761/eureka/,http://admin:123456@eureka2:8761/eureka/

#消息队列的链接配置 
spring.rabbitmq.host=192.168.226.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/

# 对应 MQ 是 exchange
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
#通过该参数指定了分区键的表达式规则
spring.cloud.stream.bindings.outputProduct.producer.partitionKeyExpression=payload
#指定了消息分区的数量。
spring.cloud.stream.bindings.outputProduct.producer.partitionCount=2
public class Product implements Serializable {

    private int id;
    private String name;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public Product() {
        super();
    }
    public Product(int id, String name) {
        super();
        this.id = id;
        this.name = name;
    }
    @Override
    public String toString() {
        return "Product [id=" + id + ", name=" + name + "]";
    }
}
public interface ISendeService {

    String OUTPUT = "outputProduct";
    
    @Output(OUTPUT)
    SubscribableChannel send(); 
}
@EnableBinding(value = ISendeService.class)
@EnableEurekaClient
@SpringBootApplication
public class ISendeApplication {

    public static void main(String[] args) {
        SpringApplication.run(ISendeApplication.class, args);
    }
}
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-config-server</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>
spring.application.name=config-server
server.port=9023

#设置服务注册中心地址
eureka.client.serviceUrl.defaultZone=http://admin:123456@eureka1:8761/eureka/,http://admin:123456@eureka2:8761/eureka/

#消息队列的链接配置 
spring.rabbitmq.host=192.168.226.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/

# 对应 MQ 是 exchange
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列
spring.cloud.stream.bindings.inputProduct.group=groupProduct
#开启消费者分区功能
spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true
#指定了当前消费者的总实例数量
spring.cloud.stream.instanceCount=2
#设置当前实例的索引号,从 0 开始
spring.cloud.stream.instanceIndex=0
    private int id;
    private String name;
public interface IReceiveService {

    String INPUT = "inputProduct";
    
    @Input(INPUT)
    SubscribableChannel receiver(); 
}
@Service
@EnableBinding({IReceiveService.class})
public class ReceiverService {

    @StreamListener(IReceiveService.INPUT)
    public void onReciver(Product product) {
        //处理消息
        System.out.println("Receiver-Partition:"+product.toString());
    }
}
@EnableBinding(value = IReceiveService.class)
@EnableEurekaClient
@SpringBootApplication
public class IReceiverApplication {

    public static void main(String[] args) {
        SpringApplication.run(IReceiverApplication.class, args);
    }
}

在消息的发送者项目里,创建测试类。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ISendeApplication.class)
public class Test {

    @Autowired
    private ISendeService isendeService;

    @org.junit.Test
    public void testSend() {
        Product product = new Product(01, "Test Sende");

        for (int i = 0; i < 10; i++) {
            // 将消息封装成Message
            Message message = MessageBuilder.withPayload(product).build();
            isendeService.send().send(message);
        }
    }
}
示例
示例

二、服务跟踪Sleuth:

1.为什么使用微服务跟踪?

示例

2.创建Sleuth案例:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <!-- 添加 Feign 坐标 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-feign</artifactId>
        </dependency>
        <!-- 添加sleuth-product-service 坐标 -->
        <dependency>
            <groupId>com.zlw</groupId>
            <artifactId>springcloud-sleuth-product-service</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>
    </dependencies>
spring.application.name=sleuth-consumer
server.port=9020

#设置服务注册中心
eureka.client.serviceUrl.defaultZone=http://admin:123456@eureka1:8761/eureka/,http://admin:123456@eureka2:8761/eureka/
@FeignClient("sleuth-product-provider")
public interface ConsumerProductService extends ProductService {
}
@RestController
public class ConsumerController {

    @Autowired
    private ConsumerProductService productService;

    @RequestMapping(value = "/list", method = RequestMethod.GET)
    public List<Product> list() {
        List<Product> products = productService.findAll();
        return products;

    }
}
@EnableEurekaClient
@EnableFeignClients
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}
<?xml version="1.0" encoding="UTF-8" ?>
 <configuration>
<!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径-->  
    <property name="LOG_HOME" value="${catalina.base}/logs/" />  
    <!-- 控制台输出 -->   
    <appender name="Stdout" class="ch.qos.logback.core.ConsoleAppender">
       <!-- 日志输出编码 -->  
        <layout class="ch.qos.logback.classic.PatternLayout">   
             <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符--> 
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n   
            </pattern>   
        </layout>   
    </appender>   
    <!-- 按照每天生成日志文件 -->   
    <appender name="RollingFile"  class="ch.qos.logback.core.rolling.RollingFileAppender">   
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--日志文件输出的文件名-->
            <FileNamePattern>${LOG_HOME}/server.%d{yyyy-MM-dd}.log</FileNamePattern>   
            <MaxHistory>30</MaxHistory>
        </rollingPolicy>   
        <layout class="ch.qos.logback.classic.PatternLayout">  
            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符--> 
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n   
            </pattern>   
       </layout> 
        <!--日志文件最大的大小-->
       <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
         <MaxFileSize>10MB</MaxFileSize>
       </triggeringPolicy>
    </appender>     

    <!-- 日志输出级别 -->
    <root level="DEBUG">   
        <appender-ref ref="Stdout" />   
        <appender-ref ref="RollingFile" />   
    </root> 

<!--日志异步到数据库 -->  
<!--     <appender name="DB" class="ch.qos.logback.classic.db.DBAppender">
        日志异步到数据库 
        <connectionSource class="ch.qos.logback.core.db.DriverManagerConnectionSource">
           连接池 
           <dataSource class="com.mchange.v2.c3p0.ComboPooledDataSource">
              <driverClass>com.mysql.jdbc.Driver</driverClass>
              <url>jdbc:mysql://127.0.0.1:3306/databaseName</url>
              <user>root</user>
              <password>root</password>
            </dataSource>
        </connectionSource>
  </appender> -->
</configuration>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Dalston.SR5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>
@RequestMapping("/product")
public interface ProductService {

    // 查询所有商品
    @RequestMapping(value = "/findAll", method = RequestMethod.GET)
    public List<Product> findAll();
}
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.3.0</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!-- 添加 product-service 坐标 -->
        <dependency>
            <groupId>com.zlw</groupId>
            <artifactId>springcloud-sleuth-product-service</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
spring.application.name=sleuth-product-provider
server.port=9001

#设置服务注册中心
eureka.client.serviceUrl.defaultZone=http://admin:123456@eureka1:8761/eureka/,http://admin:123456@eureka2:8761/eureka/
#----mysql-db-------
mybatis.type-aliases-package=com.book.product.pojo
mybatis.mapper-locations==classpath:com/book/product/mapper/*.xml

spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/book-product?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
spring.datasource.username=root
spring.datasource.password=root
@Service
public class ProductServiceImpl {

    @Autowired
    private ProductMapper productMapper;

    // 查询所有商品
    public List<Product> findAllProduct() {
        ProductExample productExample = new ProductExample();
        // selectByExampleWithBLOBs包括text文本信息
        List<Product> list = productMapper.selectByExampleWithBLOBs(productExample);
        return list;
    }
}
@RestController
public class ProductServiceController implements ProductService {

    @Autowired
    private ProductServiceImpl productServiceImpl;

    @Override
    public List<Product> findAll() {
        return productServiceImpl.findAllProduct();
    }
}
示例
Sleuth日志分析讲解
上一篇下一篇

猜你喜欢

热点阅读