Spring Cloud从入门到谈笑风生-下

2020-07-29  本文已影响0人  西海岸虎皮猫大人

Spring Cloud从入门到谈笑风生-上
仓库:
https://gitee.com/iacs/scdemo.git

10 消息驱动-Stream

10.1 概述

常见mq 4种,activemq\rabbitmq\rocketmq\kafka
系统中可能出现2种mq,切换\维护\开发成本高
stream屏蔽mq底层细节,使用适配方式在mq间切换,统一消息编程模型
通过Binder实现
应用程序通过inputs和outputs与stream中binder对象交互
仅支持rabbitmq和kafka

设计思想

定义binder中间层,实现与mq底层细节解耦
遵循发布-订阅模式
生产者->source->channel->binder->mq->binder->channel->sink->消费者

10.2 生产者消费者实现

10.2.1 生产者

新建模块 scdemo-stream-provider
依赖
eureka-client\starter-web\starter-actuator\starter-test\devtools

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

配置

server:
  port: 8801

spring:
  application:
    name: scdemo-stream-provider
  cloud:
    stream:
      # 要绑定的rabbitmq的服务信息
      binders:
        # 定义的名称,用于binder整合
        defaultRabbit:
          # 消息组件类型
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.68.101
                port: 5672
                username: guest
                password: guest
      # 服务整合处理
      bindings:
        # 通道名称
        output:
          # 要使用的exchange名称定义
          destination: studyExchange
          # 消息类型,此处为json,文本则设置"text/plain"
          content-type: application/json
          binder: defaultRabbit

# 服务注册到Eureka
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka

service接口

public interface IMessageProvider {
    public String send();
}

service实现

// 定义消息推动通道
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {
    @Resource
    // 消息发送管道
    private MessageChannel output;

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("*****serial: " + serial);
        return null;
    }
}

controller

@RestController
public class SendMessageController {
    @Resource
    private IMessageProvider messageProvider;

    @GetMapping(value="/sendMessage")
    public String sendMessage() {
       return messageProvider.send();
    }
}
10.2.2 消费者

新建模块scdemo-stream-consumer
依赖与生产者相同
配置copy生产者将output修改为input

...
      bindings:
        # 通道名称
        input:
          # 要使用的exchange名称定义
          destination: studyExchange
...

controller

@RestController
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("端口: " + serverPort + ", 接收消息: " + message.getPayload() );
    }
}

启动生产者\消费者\eureka,访问:
http://localhost:8801/sendMessage
消费者控制台打印出消息

10.3 分组消费与持久化

重复消费问题

订单被支付两次,使用Stream中的消息分组解决

重复消费模拟

消费者启动8002\8003两个实例,生产者发送一条消息,两个消费者都打印出该消息

消息分组

同一组内存在消费问题,只被消费一次
消费者配置分组

# yml格式
spring.cloud.stream.bindings.input.group=dfun-a

启动两个消费者实例,生产者发送消息,消息只被消费一次

若启动消费者两个实例时指定不同的配置

Program arguments: --spring.cloud.stream.bindings.input.group=dfun-b

则消息被消费两次

持久化

如果生产者了若干条消息,其中一个消费者去掉了分组属性,启动后不会消费消息,另一个消费者重启后会消费所有消息,避免了消息的丢失

11 链路跟踪-Sleuth

11.1 概述

微服务调用复杂,某个节点高延迟或者错误都会造成请求失败,需要链路跟踪
sleuth提供链路跟踪的完整解决方案并兼容zipkin,提供网页形式展现

11.2 搭建

11.1 zipkin server

spring cloud从F版之后不需要自己搭建zipkin server,只需调用jar包即可
下载 zipkin-server-2.12.9-exec.jar:
https://dl.bintray.com/openzipkin/maven/io/zipkin/java/zipkin-server/
运行:

java -jar .\zipkin-server-2.12.9-exec.jar

控制台:
http://localhost:9411/zipkin/
一条链路通过trace id唯一标示,span标识请求

11.2.2 服务监控

scdemo-order和scdemo-payment模块分别添加依赖和配置(相同)
依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-zipkin</artifactId>
        </dependency>

配置

spring:
...
  zipkin:
    base-url: http://localhost:9411
    sleuth:
      sampler:
        # 采样率值介于0到1之间,1表示全部采集
        probability: 1

payment模块添加接口

    @GetMapping("/payment/zipkin")
    public String zipkin() {
        return "hi,zipkin...";
    }

order模块添加消费者接口

    @GetMapping("/consumer/payment/zipkin")
    public String zipkin() {
        String result = restTemplate.getForObject("http://localhost:8001" + "/payment/zipkin", String.class);
        return result;
    }

请求消费者接口,在zipkin控制台即可看到调用链

12 Nacos-服务注册和配置中心

12.1 Spring Cloud Alibaba概述

出现原因: spring cloud netflix进入维护模式
限流降级\服务发现\配置管理\消息驱动\对象存储...
文档:
https://github.com/alibaba/spring-cloud-alibaba/blob/master/README-zh.md

12.2 Nacos概述

注册中心+配置中心
eureka+config+bus
文档:
https://nacos.io/zh-cn/
下载(github龟速):
https://pan.baidu.com/s/1186nmlqPGows9gUZKAx8Zw 提取码:rest
解压,运行bin\startup.cmd
访问控制台:
localhost:8848/nacos
使用账户nacos nacos登录

12.3 服务注册

12.3.1 服务提供者

新建模块scdemo-ali-payment
依赖

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>

配置

server:
  port: 9001

spring:
  application:
    name: scdemo-ali-payment
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848

management:
  endpoints:
    web:
      exposure:
        include: '*'

controller

@RestController
public class PaymentController {
    @Value("${server.port}")
    private String serverPort;

    @GetMapping(value="/payment/nacos/{id}")
    public String getById(@PathVariable("id") Integer id) {
        return "nacos registry, serverPort: " + serverPort;
    }
}

启动类添加@EnableDiscoveryClient注解,启动.
nacos控制台可以看到服务已被注册

12.3.2 服务消费者

新建模块scdemo-ali-order
依赖与提供者相同,启动类添加@EnableDiscoveryClient
配置

server:
  port: 83
spring:
  application:
    name: scdemo-ali-order
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
service-url:
  scdemo-ali-payment: http://scdemo-ali-payment

配置类

@Configuration
public class ApplicationContextConfig {
    @Bean
    @LoadBalanced
    public RestTemplate getRestTemplate() {
        return new RestTemplate();
    }
}

controller

@RestController
@Slf4j
public class AliOrderController {
    @Resource
    private RestTemplate restTemplate;
    @Value("${service-url.scdemo-ali-payment}")
    private String serviceUrl;

    @GetMapping("/consumer/payment/nacos/{id}")
    public String paymentInfo(@PathVariable("id") Integer id) {
        return restTemplate.getForObject(serviceUrl + "/payment/nacos/" + id, String.class);
    }

}

服务提供者启用两个端口实例,调用消费者接口可以看到nacos自带负载均衡,因为整合了ribbon
*** 注意: RestTemplate要加@LoadBalanced注解

12.4 注册中心对比

nacos支持CP和AP切换
nacos\consul支持跨注册中心,eureka和zk不支持
如果在服务级别编辑和存储配置,则CP必须,k8s适用CP

12.5 配置中心

新建模块scdemo-ali-config
依赖在12.3基础上添加

<dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>

配置
bootstrap.yml(加载优先级高于application.yml)

server:
  port: 3377
spring:
  application:
    name: scdemo-ali-config
  cloud:
    nacos:
      discovery:
        # 注册中心地址
        server-addr: localhost:8848
      config:
        # 配置中心地址
        server-addr: localhost:8848
        file-extension: yaml

application.yml

spring:
  profiles:
    # 表示开发环境
    active: dev

controller

package cn.dfun.demo.scdemo.ali.config.controller;
import ...
@RestController
@RefreshScope // 支持动态刷新
public class ConfigClientController {
    @Value("${config.info}")
    private String configInfo; // 从配置中心拉取配置

    @GetMapping("/config/info")
    public String getConfigInfo() {
        return configInfo;
    }
}

nacos控制台添加配置
Data ID格式为 服务名-环境.后缀类型
** 注意: Data ID是.yaml不是.yml

Data ID: scdemo-ali-config-dev.yaml
配置格式: 选择YAML
自定义内容:

config: 
  info: nacos config center, version=1  

访问: http://localhost:3377/config/info,可以看到配置信息,nacos控制台配置更新后客户端配置信息也实时更新,nacos配置刷新要比bus简洁

12.5 分类配置

多环境,多个微服务子项目
namespace+group+data id,namespace区分部署环境,group+data id区分目标对象,这种划分可以适应异地双活

12.5.1 通过data id区分多环境

nacos新建配置
scdemo-ali-config-test.yaml
修改application.yml

spring:
  profiles:
    active: test
#    active: dev

启动项目访问接口可见获取到了test环境的配置
** 注意: 修改配置热部署不好用,需重新启动项目

12.5.2 group方案

nacos新增两个配置
dataid同为scdemo-ali-config-info.yaml
group分别为DEV_GROUP和TEST_GROUP
修改application.yml的环境为info
bootstrap.yml指定组名

      config:
        ...
        file-extension: yaml
        group: TEST_GROUP

通过修改组名即可获取到不同的配置文件

12.5.3 namespace方案

nacos控制台添加两个命名空间dev和test
配置文件指定namespace,值自动copy生成的随机串

      config:
        ...
        file-extension: yaml
        namespace: 273d9a39-ca89-4304-83e0-d68e633eaf91
        group: TEST_GROUP

则客户端会查找namespace下对应group的配置

12.6 nacos集群和持久化配置

nacos默认使用内嵌式的数据库,需要持久化到mysql
配合nginx搭建nacos集群

12.6.1 win环境下的持久化配置

conf\nacos-mysql.sql导入mysql
修改application.properties
添加内容

spring.datasource.platform=mysql

db.num=1
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true
db.url.1=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true
db.user=root
db.password=root
12.6.2 linux集群配置
a.安装mysql

yum方式下载慢,这里使用解压版方式
下载:
https://downloads.mysql.com/archives/community/
这里选择Linux-Generic: mysql-5.7.30-linux-glibc2.12-x86_64.tar

# 解压
tar -xvf mysql-5.7.30-linux-glibc2.12-x86_64.tar
# 再次解压
tar -xvf mysql-5.7.30-linux-glibc2.12-x86_64.tar.gz -C /usr/local/
cd /usr/local/
mv mysql-5.7.30-linux-glibc2.12-x86_64/ mysql
# 创建用户组
groupadd mysql
# 创建用户,加到用户组
useradd -r -g mysql mysql
# 更改目录所属用户
chown -R mysql mysql/
# 更改用户所属组
chgrp -R mysql mysql/

cd mysql
# 初始化
./bin/mysql_install_db --user=mysql --basedir=/usr/local/mysql/ --datadir=/usr/local/mysql/data/
# 复制配置
cp -a ./support-files/mysql.server /etc/init.d/mysqld
# 修改配置根目录和数据目录
vim /etc/init.d/mysqld

basedir=/usr/local/mysql
datadir=/usr/local/mysql/data

# 删除系统默认配置
rm -rf /etc/my.cnf
# 初始化
./bin/mysqld_safe --user=mysql &
# 重启
/etc/init.d/mysqld restart
# 设置开机启动
chkconfig --level 35 mysqld on
# 查看随机生成的密码
cat /root/.mysql_secret
# 登录
./bin/mysql -uroot -p

# 设置root密码
SET PASSWORD = PASSWORD('root');
flush privileges;
# 设置远程访问
use mysql;
update user set host = '%' where user = 'root';
exit;
# 重启
/etc/init.d/mysqld restart
b.安装nginx
# 解压
tar -zxvf nginx-1.13.6.tar.gz
# 编译安装
cd nginx-1.13.6
./configure --prefix=/usr/local/nginx
make install
# 启动
cd /usr/local/nginx/sbin/
./nginx
c.mysql执行nacos脚本

新建数据库: nacos_config
脚本位置: nacos/conf
若执行报错:

this is incompatible with sql_mode=only_full_group_by...

可修改mysql配置添加下面一行,然后重启

sql_mode=STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION
d.nacos数据库配置
# cd /usr/local/nacos/conf
cp application.properties application.properties.bak
vim application.properties

# 文件末尾添加
spring.datasource.platform=mysql

db.num=1
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos_config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true
db.user=root
db.password=root
e.nacos集群配置

** 注意: 不能使用127.0.0.1

cp cluster.conf.example cluster.conf
vim cluster.conf

# 配置内容(原内容清空)
192.168.68.101:3333
192.168.68.101:4444
192.168.68.101:5555
f.修改启动脚本
cd ../bin
cp startup.sh startup.sh.bak
vim startup.sh

添加端口参数

# 添加p参数(可搜索getopts定位)
while getopts ":m:f:s:p:" opt
do
    case $opt in
        m)
            MODE=$OPTARG;;
        f)
            FUNCTION_MODE=$OPTARG;;
        s)
            SERVER=$OPTARG;;
        # 添加端口
        p)
            PORT=$OPTARG;;
...

# 倒数第二行添加 -Dserver.port=${PORT}
nohup $JAVA -Dserver.port=${PORT} ${JAVA_OPT} nacos.nacos >> ${BASE_DIR}/logs/start.out 2>&1 &
f.nginx配置
cd /usr/local/nginx/conf
cp nginx.conf nginx.conf.bak
# 修改配置
vi nginx.conf
------------------------------
    #gzip  on;
    # 添加cluster配置
    upstream cluster{
        server 127.0.0.1:3333;
        server 127.0.0.1:4444;
        server 127.0.0.1:5555;
    }

    server {
    server {
        # 修改端口
        listen       1111;
        server_name  localhost;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;

        location / {
            # 注掉下面两行
            # root   html;
            # index  index.html index.htm;
            # 添加该行
            proxy_pass http://cluster;
        }
g.启动集群
# 启动3个nacos实例
cd /usr/local/nacos/bin
./startup.sh -p 3333
./startup.sh -p 4444
./startup.sh -p 5555

# 查看启动实例个数
ps -ef|grep nacos|grep -v grep|wc -l

# 启动nginx
cd /usr/local/nginx/sbin
# 先停止nginx
./nginx -s quit
# 指定配置启动
./nginx -c /usr/local/nginx/conf/nginx.conf

访问:
http://192.168.68.101:1111/nacos
集群管理->节点列表可以查看节点状态
控制台添加一个配置,可以看到config_info表中增加了一条记录

h.服务注册到集群

scdemo-ali-payment模块,修改配置

        # nacos单节点配置
#        server-addr: localhost:8848
        # nacos集群配置(nginx地址)
        server-addr: 192.168.68.101:1111

启动服务,可以看到服务已注册

13 Sentinel-熔断\限流

13.1 概述

官网
中文文档

Hystrix问题

需要手工搭建监控平台
web页面没有细粒度的配置,流控\速率\熔断\降级

Sentinel优势

单独的组件,界面统一细粒度配置
承接阿里近10年的双十一

下载安装

这里使用sentinel-dashboard-1.7.2.jar

java -jar .\sentinel-dashboard-1.7.2.jar

访问: http://localhost:8080/#/dashboard/home
使用 sentinel/sentinel登录

13.2 服务监控

新建模块scdemo-ali-sentinel,启动类加@EnableDiscoveryClient

依赖

web\actuator\nacos-discovery+

 <!-- 持久化用到 -->
        <dependency>
            <groupId>com.alibaba.csp</groupId>
            <artifactId>sentinel-datasource-nacos</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
        </dependency>
配置

application.yml

server:
  port: 8401
spring:
  application:
    name: scdemo-ali-sentinel
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
    sentinel:
      transport:
        # sentinel dashboard地址
        dashboard: localhost:8080
        # 默认8719端口,如被占用从8719开始+1扫描直到找到未占用的端口
        port: 8719
management:
  endpoints:
    web:
      exposure:
        include: '*'

** 注意:
如果报异常:

Caused by: com.alibaba.nacos.api.exception.NacosException: endpoint is blank

是由于引入了nacos config依赖但没进行相应配置导致,把config依赖暂时注掉即可
启动本地win nacos,访问:
http://localhost:8401/testA
http://localhost:8401/testB
在sentinel控制台可以看到监控结果,sentinel监控是懒加载的需要请求后才能看到

13.2 流控规则

13.2.1 流控模式
a.直接

默认快速失败
sentinel控制台->簇点链路->选择链路点击+流控
输入单机阈值(默认QPS),点新增
执行web请求,当超过阈值时会被限流
当选择线程数时,表示服务器的并发线程,可对接口进行睡眠延迟测试效果

b.关联

当与A关联的资源B达到阈值后,限流A
场景:
支付接口压力大时限流下单接口
选择链路点击+流控->高级选项->选择关联
关联资源: /testB
使用postman模拟密集请求/testB
可以看到/testA接口被限流

13.2.2 流控效果
a.快速失败

即超过阈值直接抛异常,给出友好提示

b.预热

系统长期处于低水位时,流量陡增,直接加到高水位可能导致系统崩溃
初始阈值=阈值/coldFactor(默认3),经过预热时长达到阈值
如配置阈值为10,预热时长为3时:
初始阈值为3,经过5秒达到阈值10

c.排队等待

让请求以均匀速度通过,使用漏桶算法

13.3 降级规则

与hystrix高度相似

RT-平均响应时间

1秒内平均响应时间超过阈值,下一时间窗口熔断

异常比例

每秒请求数>N(可配置)且异常比例超过阈值,下一时间窗口熔断

异常数

1分钟内异常数超过阈值,下一时间窗口熔断

熔断降级规则比较绕,详见文档
sentinel熔断没有半开状态

13.4 热点规则

13.4.1 概述

热点场景: 统计一段时间内最常购买的商品ID或者用户ID并进行限制;
根据热点参数限流;
@SentinelResource和@HystrixCommand高度相似

13.4.2 热点key测试

controller新增方法

    @GetMapping("/testHotKey")
    // value名称唯一即可,建议与方法名一致
    @SentinelResource(value="testHotkey", blockHandler = "dealTestHotKey")
    public String testHotkey(@RequestParam(value="p1", required = false) String p1,
                              @RequestParam(value="p2", required = false) String p2) {
        return "-------testHotkey";
    }

    public String dealTestHotKey(String p1, String p2, BlockException exception) {
        return "-------dealTestHotKey";
    }

sentinel控制台点击+热点,配置参数索引(0代表参数p1)和阈值(只支持QPS),请求携带p1参数超过阈值即限流
dealTestHotKey只处理限流,不处理运行时异常

13.4.3 参数例外项

期望参数是某个特殊值时,和其他的限流值不同
控制台->热点规则->编辑->高级选择
类型: String 值: 5 阈值: 100
当携带参数&p1=5时超过100才会限流

13.5 系统规则

从整体维度限流,支持自适应\RT\线程数\入口QPS\CPU使用率
使用起来较为危险

13.6 SentinelResource注解

13.6.1 根据资源名称限流

通过SentinelResource指定限流方法

@RestController
@Slf4j
public class RateLimitController {
    /**
     * 根据资源名称限流,另外还可根据url限流
     */
    @GetMapping("/byResource")
    @SentinelResource(value="byResource", blockHandler = "handleException")
    public CommonResult byResource() {
        return CommonResult.success();
    }

    public CommonResult handleException(BlockException exception) {
        log.info(exception.getClass().getCanonicalName() + "服务不可用");
        return CommonResult.fail();
    }
}

类似地,还可以根据url限流

13.6.2 指定限流类

实现限流方法与业务逻辑的解耦
实现限流类

public class CustomBlockHandler {
    public static CommonResult handleException(BlockException exception) {
        return new CommonResult(4444, "客户自定义,global handleException");
    }
}

接口

    /**
     * 自定义限流处理类和处理方法
     */
    @GetMapping("/customBlockHandler")
    @SentinelResource(value="byResource",
            blockHandlerClass=CustomBlockHandler.class, // 指定处理类
            blockHandler="handleException") // 指定处理方法
    public CommonResult customBlockHandler() {
        return CommonResult.success();
    }

通多指定方法名可以切换限流方法

13.6.3 Sentinel 3个核心API

Sphu 资源
Tracer 统计
ContextUtil 上下文
仅作为理解

13.7 熔断

13.7.1 熔断配置

新建scdemo-ali-order-fallback模块,依赖nacos\sentinel\common

配置
server:
  port: 84
spring:
  application:
    name: scdemo-ali-order-fallback
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
    sentinel:
      transport:
        # sentinel dashboard地址
        dashboard: localhost:8080
        # 默认8719端口,如被占用从8719开始+1扫描直到找到未占用的端口
        port: 8719
service-url:
  scdemo-ali-payment: http://scdemo-ali-payment

management:
  endpoints:
    web:
      exposure:
        include: '*'
配置类
@Configuration
public class ApplicationContextConfig {
    @Bean
    @LoadBalanced
    public RestTemplate getRestTemplate() {
        return new RestTemplate();
    }
}
controller
@RestController
@Slf4j
public class AliOrderController {
    @Resource
    private RestTemplate restTemplate;
    @Value("${service-url.scdemo-ali-payment}")
    private String serviceUrl;

    @GetMapping("/fallback/payment/{id}")
    // 未配置fallback方法直接报异常
//    @SentinelResource(value = "fallback")
    // 只指定降级方法
//    @SentinelResource(value = "fallback", fallback = "handleFallback")
    // 只指定限流方法,第一次请求抛异常,限流后被blockHandler处理
//    @SentinelResource(value = "fallback", blockHandler = "handleBlock")
    // 同时指定限流降级方法,第一次请求被降级,限流后被blockHandler处理
//    @SentinelResource(value = "fallback", blockHandler = "handleBlock", fallback = "handleFallback")
    @SentinelResource(value = "fallback", blockHandler = "handleBlock", fallback = "handleFallback",
        // 忽略异常
        exceptionsToIgnore={IllegalArgumentException.class})
    public CommonResult fallback(@PathVariable Long id) {
        String url = serviceUrl + "/payment/" + id;
        CommonResult result = restTemplate.getForObject(url, CommonResult.class, id);
        if(id == 4) {
            throw new IllegalArgumentException("非法参数");
        } else if(result.getData() == null) {
            throw new NullPointerException("空指针");
        }
        return result;
    }

    /**
     * 降级处理
     */
    public CommonResult handleFallback(@PathVariable Long id, Throwable e) {
        Payment payment = new Payment(id, "null");
        String msg = "handleFallback, exception: " + e.getMessage();
        return CommonResult.success(msg, payment);
    }

    /**
     * 限流处理
     */
    public CommonResult handleBlock(@PathVariable Long id, BlockException e) {
        Payment payment = new Payment(id, "null");
        String msg = "handleBlock, exception: " + e.getMessage();
        return CommonResult.success(msg, payment);
    }

}
ali-payment模块controller添加
    public static HashMap<Long, Payment> hashMap = new HashMap<>();
    // 模拟dao处理
    static {
        hashMap.put(1L, new Payment(1L, "1111111"));
        hashMap.put(2L, new Payment(1L, "2222222"));
        hashMap.put(3L, new Payment(1L, "3333333"));
    }
...
    @GetMapping(value="payment/{id}")
    public CommonResult<Payment> getById(@PathVariable("id") Long id) {
        Payment payment = hashMap.get(id);
        String msg = "port:" + serverPort;
        CommonResult<Payment> result = CommonResult.success(msg, payment);
        return result;
    }
测试

ali-payment模块启动两个端口实例,启动ali-order-fallback模块和nacos\sentinel
i.未配置限流\降级方法,直接抛异常
ii.配置降级方法,进行降级处理
iii.只配置限流方法,sentinel控制台配置限流规则,则请求异常报错,超出限流阈值后进行限流处理
iv.同时配置限流\降级方法,首先进行降级处理,超出阈值后进行限流处理
v.配置忽略异常,该异常请求直接报错不走降级方法

13.7.2 OpenFeign整合
依赖

ali-order-fallback模块添加依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
配置
# 激活sentinel对feign的支持
feign:
  sentinel:
    enabled: true
feign接口
package cn.dfun.demo.scdemo.ali.order.service;

import ...

@FeignClient(value="scdemo-ali-payment", fallback = PaymentFallbackService.class)
public interface PaymentService {
    @GetMapping(value="/payment/{id}")
    CommonResult<Payment> getById(@PathVariable("id") Long id);
}

fallback
package cn.dfun.demo.scdemo.ali.order.service;

import ...

@Component
public class PaymentFallbackService implements PaymentService{
    @Override
    public CommonResult<Payment> getById(Long id) {
        String msg = "PaymentFallbackService, 服务降级降级返回, id:" + id;
        return CommonResult.fail(msg);
    }
}

controller
    @GetMapping(value="/consumer/payment/{id}")
    CommonResult<Payment> getById(@PathVariable("id") Long id) {
        return paymentService.getById(id);
    }

启动,测试/payment/{id}接口调用

13.8 sentinel规则持久化

问题: 服务重启后sentinel规则会消失
持久化的nacos

依赖
 <!-- sentinel持久化 -->
        <dependency>
            <groupId>com.alibaba.csp</groupId>
            <artifactId>sentinel-datasource-nacos</artifactId>
        </dependency>
配置
spring:
  ...
  cloud:
    ...
    sentinel:
      ...
      datasource:
        ds1:
          nacos:
            server-addr: localhost:8848
            dataId: scdemo-ali-order-fallback
            groupId: DEFAULT_GROUP
            data-type: json
            rule-type: flow
nacos新建配置

dataId: scdemo-ali-order-fallback
type: json

[
    {
        "resource": "fallback",
        "limitApp": "default",
        "grade": 1,
        "count": 1,
        "strategy": 0,
        "controlBehavior": 0,
        "clusterMode": false
    }
]

启动服务,调用fallback接口,sentinel控制台会显示流控规则,服务重启后规则不消失

14.Seata-分布式事务

14.1 概述

下订单->减库存->支付
多模块,多库
解决跨系统跨数据源的数据一致性问题
官网

1 ID 3组件

1 ID+ 3 组件模型
全局的唯一事务ID
TC-事务协调者 协调
TM-事务管理器 全局事务提交或回滚
RM-资源管理器 分支事务提交或回滚

处理过程

TM向TC申请全局事务,生成唯一XID
RM向TC注册全局事务
TM向TC发起针对XID的全局提交或回滚决议
TC调度全部分支事务提交或回滚

14.2 seata配置安装

https://pan.baidu.com/s/1ZqyOC4j_bTWPbp9cPZMZcA
提取码:kqvs
这里选择0.9.0

配置

修改conf\file.conf
修改事务组名称\存储类型\mysql信息

service {
  #transaction service group mapping
  # 修改事务组名称(随便起)
  vgroupMapping.my_test_tx_group = "dfun_tx_group"
...
store {
  mode = "db"
...
    url = "jdbc:mysql://127.0.0.1:3306/seata"
    user = "root"
    password = "root"
...

修改conf\registry.conf

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  # 注册中心改为nacos
  type = "nacos"

  nacos {
    # nacos地址
    serverAddr = "localhost:8848"
...
建库

新建数据库seata,导入conf\db_store.sql

启动nacos完毕后,启动seata\bin\seata-server.bat

14.3 服务搭建

新建三个模块
scdemo-ali-seata-order 订单
scdemo-ali-seata-account 库存
scdemo-ali-seata-storage 账户

14.3.1 数据库
# 新建三个数据库seata_order \ seata_account \ seata_storage
# seata_order库创建订单表
create table t_order(
    id bigint(11) not null auto_increment primary key,
    user_id bigint(11) default null comment '用户id',
    product_id bigint(11) default null comment '产品id',
    count int(11) default null comment '数量',
    money decimal(11,0) default null comment '金额',
    status int(1) default null comment '订单状态: 0-创建中;1-已完结'
) engine=innodb auto_increment=7 default charset=utf8;

# seata_account库创建账户表
create table t_account(
id bigint(11) not null auto_increment primary key comment 'id',
user_id bigint(11) default null comment '用户id',
total decimal(10,0) default null comment '总额度',
used decimal(10,0) default null comment '已用额度',
residue decimal(10,0) default '0' comment '剩余可用额度'
)engine=innodb auto_increment=2 default charset=utf8;

# 账户表插入数据
insert into seata_account.t_account(id, user_id, total, used, residue)
values('1', '1', '1000', '0', '1000')

# seata_storage创建库存表
create table t_storage(
id bigint(11) not null auto_increment primary key,
product_id bigint(11) default null comment '产品id',
total int(11) default null comment '总库存',
used int(11) default null comment '已用库存',
residue int(11) default null comment '剩余库存'
) engine=innodb auto_increment=2 default charset=utf8; 

# 库存表插入数据
insert into seata_storage.t_storage(id, product_id, total, used, residue)
values ('1', '1', '100', '0', '100')
14.3.2 依赖配置
依赖

3个模块依赖相同
指定seata-all版本与server保持一致

      <dependencies>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-all</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-all</artifactId>
            <version>0.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <dependency>
            <groupId>cn.dfun</groupId>
            <artifactId>scdemo-common</artifactId>
            <version>${project.version}</version>
        </dependency>
  
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--热部署插件-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
    </dependencies>
配置

3个模块配置基本相同,只需修改应用名和数据库名称

server:
  port: 2003
spring:
  application:
    name: scdemo-ali-seata-account
  cloud:
    alibaba:
      seata:
        # 事务组名称,与seata server中对应
        tx-service-group: fsp_tx_group
    nacos:
      discovery:
        server-addr: localhost:8848
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/seata_account?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
    username: root
    password: root

feign:
  hystrix:
    enabled: true

logging:
  level:
    io:
      seata: info

mybatis:
  mapper-locations: classpath:mapper/*.xml
配置类

数据库连接池配置,使用seata代理
3个模块配置类相同

@Configuration
public class DatasourceProxyConfig {
    @Value("${mybatis.mapper-locations}")
    private String mapperLocations;

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource(){
        return new DruidDataSource();
    }

    @Bean
    public DataSourceProxy dataSourceProxy(DataSource dataSource) {
        return new DataSourceProxy(dataSource);
    }

    @Bean
    public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSourceProxy);
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
        sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
        return sqlSessionFactoryBean.getObject();
    }
}

mybatis配置

@Configuration
@MapperScan({"cn.dfun.demo.scdemo.ali.seata.dao"})
public class MyBatisConfig {
}
启动类添加注解

3模块启动类注解相同

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@EnableDiscoveryClient
@EnableFeignClients
14.3.3 业务代码

核心代码(具体参考代码仓库)
当添加 @GlobalTransactional注解时,某模块异常后3个模块均回滚处理

@Override
    // 添加GlobalTransactional注解进行全局异常处理, name自定义
    @GlobalTransactional(name="fsp-create-order", rollbackFor = Exception.class)
    public void create(Order order) {
        log.info("------->开始新建订单");
        orderDao.create(order);

        log.info("------->开始微服务扣减库存");
        storageService.decrease(order.getProductId(), order.getCount());
        log.info("------->结束微服务扣减库存");

        log.info("------->开始微服务扣减余额");
        accountService.decrease(order.getUserId(), order.getMoney());
        log.info("------->结束微服务扣减余额");

        log.info("------->开始修改订单状态");
        orderDao.update(order.getUserId(), 0);
        log.info("------->结束修改订单状态");

        log.info("------->下订单结束");
    }
上一篇下一篇

猜你喜欢

热点阅读