dubbo原理 & 学习

2019-08-10  本文已影响0人  suxin1932

1.Apache Dubbo

官方网址
http://dubbo.apache.org/zh-cn/docs/user/preface/background.html (使用者文档)
http://dubbo.apache.org/zh-cn/blog/download.html (源码)
https://github.com/apache/dubbo (源码)

2. 概述

2.1框架设计

来自官网
http://dubbo.apache.org/zh-cn/docs/dev/design.html

Dubbo框架整体设计.png
#图例说明:
>> 图中左边淡蓝背景的为服务消费方使用的接口,
右边淡绿色背景的为服务提供方使用的接口,
位于中轴线上的为双方都用到的接口。
>> 图中从下至上分为十层,各层均为单向依赖,
右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,
其中,Service 和 Config 层为 API,其它各层均为 SPI。
>> 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
>> 图中蓝色虚线为初始化过程,即启动时组装链,
红色实线为方法调用过程,即运行时调时链,
紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。

各层说明

>> service层:
接口层,给服务提供者和消费者来实现的
>> config 配置层:
对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,
可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
>> proxy 服务代理层:
服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,
扩展接口为 ProxyFactory
>> registry 注册中心层:
封装服务地址的注册与发现,以服务 URL 为中心,
扩展接口为 RegistryFactory, Registry, RegistryService
>> cluster 路由层:
封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,
扩展接口为 Cluster, Directory, Router, LoadBalance
>> monitor 监控层:
RPC 调用次数和调用时间监控,以 Statistics 为中心,
扩展接口为 MonitorFactory, Monitor, MonitorService
>> protocol 远程调用层:
封装 RPC 调用,以 Invocation, Result 为中心,
扩展接口为 Protocol, Invoker, Exporter
>> exchange 信息交换层:
封装请求响应模式,同步转异步,以 Request, Response 为中心,
扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
>> transport 网络传输层:
抽象 mina 和 netty 为统一接口,以 Message 为中心,
扩展接口为 Channel, Transporter, Client, Server, Codec
>> serialize 数据序列化层:
可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool
#关系说明
>> 在 RPC 中,Protocol 是核心层,也就是只要有 Protocol + Invoker + Exporter,
就可以完成非透明的 RPC 调用,然后在 Invoker 的主过程上 Filter 拦截点。
>> 图中的 Consumer 和 Provider 是抽象概念,
只是想让看图者更直观的了解哪些类分属于客户端与服务器端,
不用 Client 和 Server 的原因是 Dubbo 在很多场景下都使用 
Provider, Consumer, Registry, Monitor 划分逻辑拓普节点,保持统一概念。
>> Cluster 是外围概念,所以 Cluster 的目的是将多个 Invoker 伪装成一个 Invoker,
这样其它人只要关注 Protocol 层 Invoker 即可,
加上 Cluster 或者去掉 Cluster 对其它层都不会造成影响,
因为只有一个提供者时,是不需要 Cluster 的。
>> Proxy 层封装了所有接口的透明化代理,而在其它层都以 Invoker 为中心,
只有到了暴露给用户使用时,才用 Proxy 将 Invoker 转成接口,或将接口实现转成 Invoker,
也就是去掉 Proxy 层 RPC 是可以 Run 的,只是不那么透明,不那么看起来像调本地服务一样调远程服务。
>> Remoting 实现是 Dubbo 协议的实现,如果你选择 RMI 协议,
整个 Remoting 都不会用上,Remoting 内部再划为 Transport 传输层和 Exchange 信息交换层,
Transport 层只负责单向消息传输,是对 Mina, Netty, Grizzly 的抽象,
它也可以扩展 UDP 传输,而 Exchange 层是在传输层之上封装了 Request-Response 语义。
>> Registry 和 Monitor 实际上不算一层,而是一个独立的节点,
只是为了全局概览,用层的方式画在一起。

模块说明

Dubbo模块.png Dubbo模块.png
>> dubbo-common 公共逻辑模块:
包括 Util 类和通用模型。
>> dubbo-remoting 远程通讯模块:
相当于 Dubbo 协议的实现,如果 RPC 用 RMI协议则不需要使用此包。
>> dubbo-rpc 远程调用模块:
抽象各种协议,以及动态代理,只包含一对一的调用,不关心集群的管理。
>> dubbo-cluster 集群模块:
将多个服务提供方伪装为一个提供方,包括:负载均衡, 容错,路由等,
集群的地址列表可以是静态配置的,也可以是由注册中心下发。
>> dubbo-registry 注册中心模块:
基于注册中心下发地址的集群方式,以及对各种注册中心的抽象。
>> dubbo-monitor 监控模块:
统计服务调用次数,调用时间的,调用链跟踪的服务。
>> dubbo-config 配置模块:是 Dubbo 对外的 API,用户通过 Config 使用Dubbo,隐藏 Dubbo 所有细节。
>> dubbo-container 容器模块:
是一个 Standlone 的容器,以简单的 Main 加载 Spring 启动,
因为服务通常不需要 Tomcat/JBoss 等 Web 容器的特性,没必要用 Web 容器去加载服务。
>> dubbo-plugin 插件模块
qos是dubbo的在线运维命令,dubbo 2.5.8新版本重构了telnet模块,提供了新的telnet命令支持,
新版本的telnet端口与dubbo协议的端口是不同的端口,默认为22222,可以通过配置文件dubbo.properties修改。
[
http://dubbo.apache.org/zh-cn/docs/user/references/qos.html
http://dubbo.apache.org/zh-cn/docs/user/references/telnet.html
]

#整体上按照分层结构进行分包,与分层的不同点在于:
>> container 为服务容器,用于部署运行服务,没有在层中画出。
>> protocol 层和 proxy 层都放在 rpc 模块中,这两层是 rpc 的核心,
在不需要集群也就是只有一个提供者时,可以只使用这两层完成 rpc 调用。
>> transport 层和 exchange 层都放在 remoting 模块中,为 rpc 调用的通讯基础。
>> serialize 层放在 common 模块中,以便更大程度复用。

依赖关系

Dubbo依赖关系.png
>> 图中小方块 Protocol, Cluster, Proxy, Service, Container, Registry, Monitor 代表层或模块,
蓝色的表示与业务有交互,绿色的表示只对 Dubbo 内部交互。
>> 图中背景方块 Consumer, Provider, Registry, Monitor 代表部署逻辑拓扑节点。
>> 图中蓝色虚线为初始化时调用,红色虚线为运行时异步调用,红色实线为运行时同步调用。
>> 图中只包含 RPC 的层,不包含 Remoting 的层,Remoting 整体都隐含在 Protocol 中。

领域模型

#在 Dubbo 的核心领域模型中:
#1.Protocol 
是服务域,它是 Invoker 暴露和引用的主功能入口,
它负责 Invoker 的生命周期管理。
#2.Invoker 
是实体域,它是 Dubbo 的核心模型,
其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,
它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
#3.Invocation 
是会话域,它持有调用过程中的变量,比如方法名,参数等。

#其他模型 or class
#1. xxxConfig
>> 服务提供方被抽象为: ServiceBean 
>> 服务消费方被抽象为: ReferenceBean

#2.URL (公共契约)
>> URL 作为配置信息的统一格式,所有扩展点都通过传递 URL 携带配置信息。
>> 所有扩展点参数都包含 URL 参数,URL 作为上下文信息贯穿整个扩展点设计体系。
>> URL 采用标准格式:protocol://username:password@host:port/path?key=value&key=value
例如: 
>> 服务导出时, URL示例:
dubbo://192.168.0.199:20880/com.zy.dubbo.IDubboService?anyhost=true&application=dubbo-demo-provider&bean.name=ServiceBean:com.zy.dubbo.IDubboService:1.0.0&bind.ip=172.29.165.17&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.zy.dubbo.IDubboService&methods=dubbo&pid=20208&qos.enable=false&register=true&release=2.7.3&revision=1.0.0&side=provider&timestamp=1577948892519&token=true&version=1.0.0
>> 服务引用时, URL示例:
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-consumer&backup=127.0.0.1:2182,127.0.0.1:2183&check=true&dubbo=2.0.2&pid=17684&qos.enable=true&registry=zookeeper&release=2.7.3&timestamp=1577948729805

Dubbo--xxxConfig.png

基本设计原则

>> 采用 Microkernel + Plugin 模式,
Microkernel 只负责组装 Plugin,Dubbo 自身的功能也是通过扩展点实现的,
也就是 Dubbo 的所有功能点都可被用户自定义扩展所替换。
>> 采用 URL 作为配置信息的统一格式,所有扩展点都通过传递 URL 携带配置信息。

2.2网络通信 & 序列化

协议 连接个数 连接方式 传输协议 传输方式 序列化 适用范围 适用场景 约束
dubbo:// 单连接 长连接 TCP NIO 异步传输 Hessian 传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用 dubbo 协议传输大文件或超大字符串。 常规远程服务方法调用 参数及返回值需实现 Serializable 接口/参数及返回值不能自定义实现 List, Map, Number, Date, Calendar 等接口,只能用 JDK 自带的实现,因为 hessian 会做特殊处理,自定义实现类中的属性值都会丢失。/Hessian 序列化,只传成员属性值和值的类型,不传方法或静态变量
rmi:// 多连接 阻塞式短连接 TCP 同步传输 jdk 传入传出参数数据包大小混合,消费者与提供者个数差不多,可传文件。 常规远程服务方法调用,与原生RMI服务互操作 参数及返回值需实现 Serializable 接口/dubbo 配置中的超时时间对 RMI 无效,需使用 java 启动参数设置:-Dsun.rmi.transport.tcp.responseTimeout=3000
hessian:// 多连接 短连接 HTTP 同步传输 Hessian 传入传出参数数据包较大,提供者比消费者个数多,提供者压力较大,可传文件。 页面传输,文件传输,或与原生hessian服务互操作 同dubbo://
http:// 多连接 短连接 HTTP 同步传输 表单序列化 传入传出参数数据包大小混合,提供者比消费者个数多,可用浏览器查看,可用表单或URL传入参数,暂不支持传文件。 需同时给应用程序和浏览器 JS 使用的服务。 参数及返回值需符合 Bean 规范
webservice:// 多连接 短连接 HTTP 同步传输 SOAP 序列化 - 系统集成,跨语言调用 参数及返回值需实现 Serializable 接口/参数尽量使用基本类型和 POJO
thrift:// - - - - - - - -
memcached:// - - - - - - - -
redis:// - - - - - - - -
rest:// - - - - - - - -

2.3SPI机制

https://www.jianshu.com/p/43f529637f12

2.4集群负载均衡策略

http://dubbo.apache.org/zh-cn/docs/source_code_guide/loadbalance.html

#1)random loadbalance
默认情况下,dubbo是random load balance随机调用实现负载均衡,
可以对provider不同实例设置不同的权重,会按照权重来负载均衡,
权重越大分配流量越高,一般就用这个默认的就可以了。

#2)roundrobin loadbalance
这个的话默认就是均匀地将流量打到各个机器上去,
但是如果各个机器的性能不一样,容易导致性能差的机器负载过高。
所以此时需要调整权重,让性能差的机器承载权重小一些,流量少一些。

#3)leastactive loadbalance
这个就是自动感知一下,如果某个机器性能越差,那么接收的请求越少,
越不活跃,此时就会给不活跃的性能差的机器更少的请求

#4)consistanthash loadbalance
一致性Hash算法,相同参数的请求一定分发到一个provider上去,
provider挂掉的时候,会基于虚拟节点均匀分配剩余的流量,抖动不会太大。
如果你需要的不是随机负载均衡,是要一类请求都到一个节点,那就走这个一致性hash策略。

2.5集群容错策略

http://dubbo.apache.org/zh-cn/docs/source_code_guide/cluster.html

为了避免单点故障,现在的应用通常至少会部署在两台服务器上。
对于一些负载比较高的服务,会部署更多的服务器。
这样,在同一环境下的服务提供者数量会大于1。
对于服务消费者来说,同一环境下出现了多个服务提供者。
这时会出现一个问题,服务消费者需要决定选择哪个服务提供者进行调用。
另外服务调用失败时的处理措施也是需要考虑的,是重试呢,还是抛出异常,亦或是只打印异常等。
为了处理这些问题,Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker。
集群 Cluster 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。
这样一来,服务消费者只需通过这个 Invoker 进行远程调用即可,
至于具体调用哪个服务提供者,以及调用失败后如何处理等问题,现在都交给集群模块去处理。
集群模块是服务提供者和服务消费者的中间层,为服务消费者屏蔽了服务提供者的情况,
这样服务消费者就可以专心处理远程调用相关事宜。
比如发请求,接受服务提供者返回的数据等。
这就是集群的作用。

集群容错的组件

包含 Cluster、Cluster Invoker、Directory、Router 和 LoadBalance 等。
集群工作过程可分为两个阶段:
第一个阶段是在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。
第二个阶段是在服务消费者进行远程调用时。
以 FailoverClusterInvoker 为例,该类型 Cluster Invoker 首先会调用 
Directory 的 list 方法列举 Invoker 列表(可将 Invoker 简单理解为服务提供者)。
Directory 的用途是保存 Invoker,可简单类比为 List<Invoker>。
其实现类 RegistryDirectory 是一个动态服务目录,可感知注册中心配置的变化,
它所持有的 Invoker 列表会随着注册中心内容的变化而变化。
每次变化后,RegistryDirectory 会动态增删 Invoker,
并调用 Router 的 route 方法进行路由,过滤掉不符合路由规则的 Invoker。
当 FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表后,
它会通过 LoadBalance 从 Invoker 列表中选择一个 Invoker。
最后 FailoverClusterInvoker 会将参数传给 LoadBalance 选择出的 Invoker 实例的 invoker 方法,
进行真正的远程调用。
集群容错的组件.png

Dubbo 主要提供了这样几种容错方式

#1)failover cluster模式
失败自动切换,自动重试其他机器,默认就是这个,常见于读操作

#2)failfast cluster模式
一次调用失败就立即失败,常见于写操作

#3)failsafe cluster模式
出现异常时忽略掉,常用于不重要的接口调用,比如记录日志

#4)failbackc cluster模式
失败了后台自动记录请求,然后定时重发,比较适合于写消息队列这种

#5)forking cluster
并行调用多个provider,只要一个成功就立即返回

#6)broadcacst cluster
逐个调用所有的provider

2.6服务治理, 降级, 重试机制, 幂等性

研究下降级, 重试, 幂等代码实现机制, 不仅仅是dubbo的

https://www.jianshu.com/writer#/notebooks/16290018/notes/40247546/preview (幂等性)

3.代码实现 (小 demo)

https://github.com/zhangxin1932/dubbo-spring-cloud.git

4.源码解析

4.1 服务导出过程export

项目启动时, provider向 zk 注册所有@Service(dubbo的注解)的过程

// 这里的 refresh 过程, 可以参考 https://www.jianshu.com/p/5d5890645165
--> AbsctApplicationContext#refresh
    // refresh --> step5
    --> ApplicationContext#invokeBeanFactoryPostProcessors
        --> @EnableDubbo 
            --> @DubboComponentScan
                // 关于 @Import 注解的作用, 上述网址也已描述
                --> @Import(DubboComponentScanRegistrar.class)
                    --> DubboComponentScanRegistrar#registerBeanDefinitions
                        --> ServiceAnnotationBeanPostProcessor#registerServiceBeans
                            // 这一步, 根据所有加了 @org.apache.dubbo.config.annotation.Service注解的实现类的接口, 
                            // 构造多个 ServiceBean 对应的 BeanDefinition, 并将构造的不同的 ServiceBean 的 BeanDefinition, 注册进 Spring 中
                            --> ServiceAnnotationBeanPostProcessor#buildServiceBeanDefinition
                            .......
    // refresh --> step12
    --> ApplicationContext#finishRefresh
        --> AbstractApplicationContext#publishEvent(ApplicationEvent)
            // 由于 ServiceBean 实现了 ApplicationListener 接口, 其实例化后, 会调用其方法ServiceBean#onApplicationEvent
            --> ServiceBean#onApplicationEvent
                --> ServiceBean#export
                // 接下来的过程, 可以参考其官网, 解释的较为详细
                http://dubbo.apache.org/zh-cn/docs/source_code_guide/export-service.html
dubbo启动时netty server启动.png

provider在zk上注册的数据

provider在zk上注册的数据.png

4.2 服务引用过程refer --> 引用!!!不是服务调用!!!

项目启动时, consumer从 zk 注册所有@Reference注解

// 这里的 refresh 过程, 可以参考 https://www.jianshu.com/p/5d5890645165
--> AbsctApplicationContext#refresh
    // refresh --> step5
    --> ApplicationContext#invokeBeanFactoryPostProcessors
        --> @EnableDubbo 
            --> @DubboComponentScan
                // 关于 @Import 注解的作用, 上述网址也已描述
                --> @Import(DubboComponentScanRegistrar.class)
                    --> DubboComponentScanRegistrar#registerBeanDefinitions
                        --> DubboComponentScanRegistrar#registerReferenceAnnotationBeanPostProcessor
                            // 这一步, 注册 ReferenceAnnotationBeanPostProcessor 对应的 BeanDefinition
                            --> BeanRegistrar#registerInfrastructureBean
                            ...
    // refresh --> step11
    --> ApplicationContext#finishBeanFactoryInitialization
        --> ReferenceAnnotationBeanPostProcessor
            // 这里最终走进了下述方法, 该类实现了 Spring 的 InstantiationAwareBeanPostProcessorAdapter 接口, 会在 bean 实例化时调用
            --> AnnotationInjectedBeanPostProcessor#postProcessPropertyValues
                // 后续 Reference bean 创建流程及 NettyClient 启动流程参考下图中的调用链路即可
                ...
                --> ServiceBean#onApplicationEvent
                    --> ServiceBean#export
                    // 接下来的过程, 可以参考其官网, 解释的较为详细
                    http://dubbo.apache.org/zh-cn/docs/source_code_guide/refer-service.html
dubbo启动时netty client启动.png dubbo启动时, consumer向zk注册时, 会生成 proxy0的代理类, 生成本地存根, 供后续RPC调用.png

producer & consumer都会向zk注册自身信息

producer & consumer都会向zk注册自身信息.png
dubbo中如果采用了dubbo协议进行通信:

provider向注册中心注册信息时, 会启动 NettyServer
org.apache.dubbo.remoting.transport.netty4.NettyServer#doOpen

consumer从注册中心获取信息时, 会启动 NettyClient
org.apache.dubbo.remoting.transport.netty4.NettyClient#doOpen

4.3 Dubbo通信的编解码机制及解决粘包拆包问题 & 序列化

http://dubbo.apache.org/zh-cn/docs/source_code_guide/service-invoking-process.html (官网)
https://www.jianshu.com/p/249eebe64a91 (RPC调用过程)

Dubbo的网络通信框架Netty是基于TCP协议的,TCP协议的网络通信会存在粘包和拆包的问题, 原因为:
>> 当要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包
>> 待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包
>> 要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包
>> 接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包

业界的解决方法一般有以下几种:
>> 将每个数据包分为消息头和消息体,消息头中应该至少包含数据包的长度,这样接收端在接收到数据后,就知道每一个数据包的实际长度了(Dubbo就是这种方案)
>> 消息定长,每个数据包的封装为固定长度,不够补0
>> 在数据包的尾部设置特殊字符,比如FTP协议
当然Netty中提供了这些解决方案的 Handler, 如 DelimiterBasedFrameDecoder, LengthFieldBasedFrameDecoder等.

4.3.1 Dubbo消息协议头规范

此处以 Netty4为例分析.
Dubbo消息协议头规范.png
偏移量(Bit) 字段 取值
0 ~ 7 魔数高位 0xda00
8 ~ 15 魔数低位 0xbb
16 数据包类型 0 - Response, 1 - Request
17 调用方式 仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用
18 事件标识 0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包
19 ~ 23 序列化器编号 2 - Hessian2Serialization
3 - JavaSerialization
4 - CompactedJavaSerialization
6 - FastJsonSerialization
7 - NativeJavaSerialization
8 - KryoSerialization
9 - FstSerialization
24 ~ 31 状态 20 - OK
30 - CLIENT_TIMEOUT
31 - SERVER_TIMEOUT
40 - BAD_REQUEST
50 - BAD_RESPONSE
......
32 ~ 95 请求编号 共8字节,运行时生成
96 ~ 127 消息体长度 运行时计算
dubbo的消息头是一个定长的 16个字节的数据包:
>> magic High(第0-7位) & Magic Low(第8-15位):
共2byte, 类似java字节码文件里的魔数,用来判断是不是dubbo协议的数据包,就是一个固定的数字
>> Serialization id(序列id):
第16-20位, 共1byte
>> event: 第21位
>> two way: 第22位, 一个标志位,是单向的还是双向的
>> Req/res: 第23位, 请求或响应标识
>> status: 第24-31位, 共1byte.状态位,设置请求响应状态,request为空,response才有值.
>> Id(long):第32-95位, 共8byte.每一个请求的唯一识别id
由于采用异步通讯的方式,用来把请求request和返回的response对应上.
>> data length: 第96-127位, 共4byte, 消息体长度,int 类型

Dubbo采用消息头和消息体的方式来解决粘包拆包,
并在消息头中放入了一个唯一Id来解决异步通信关联request和response的问题.

4.3.2 请求(编码/解码)----响应(编码-解码)

4.3.2.1 初始化Codec2(编解码器)

org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter

// 关于 Dubbo中NettyServer, NettyClient 启动详见上文.
// NettyServer#doOpen, 或 NettyClient#decode 启动定义 Handler 时, 走到这里, 根据 URL 获取 Codec, 默认为 DubboCountCodec, Netty4.
--> AbstractEndpoint#getChannelCodec
    // 初始化 Codec, 默认为 DubboCountCodec
    --> AbstractEndpoint#AbstractEndpoint
        // NettyServer#doOpen 及 NettyClient#doOpen 中, 初始化了 Codec --> 即为 DubboCountCodec
        // pipeline.addLast("decoder", adapter.getDecoder()) --> org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalDecoder
        // pipeline.addLast("encoder", adapter.getEncoder()) --> org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalEncoder
        // 当请求或响应时, 会进入这两个 Internalxxx 中, 调用:
        // codec.encode --> 对应 DubboCountCodec.encode, 或 codec.decode --> 对应 DubboCountCodec.decode
        --> NettyCodecAdapter#NettyCodecAdapter

4.3.2.2 请求 & 响应编解码大致流程

// 请求 编码过程
--> NettyCodecAdapter.InternalEncoder#encode
    --> DubboCountCodec#encode
        --> ExchangeCodec#encode
            --> ExchangeCodec#encodeRequest
                --> DubboCodec#encodeRequestData(Channel, ObjectOutput, Object, String)
    
// 响应 编码过程
--> NettyCodecAdapter.InternalEncoder#encode
    --> DubboCountCodec#encode
        --> ExchangeCodec#encode
            --> ExchangeCodec#encodeResponse
                --> DubboCodec#encodeResponseData(Channel, ObjectOutput, Object, String)
    
// 请求 & 响应 解码过程
--> NettyCodecAdapter.InternalDecoder#decode
    --> DubboCountCodec#decode
        --> ExchangeCodec#decode(Channel, ChannelBuffer)
            --> ExchangeCodec#decode(Channel, ChannelBuffer, int, byte[])
                --> ExchangeCodec#decodeBody

4.3.2.3 源码解析

NettyCodecAdapter --> Netty编解码器适配器类

package org.apache.dubbo.remoting.transport.netty4;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.remoting.Codec2;
import org.apache.dubbo.remoting.buffer.ChannelBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.IOException;
import java.util.List;

/**
 * Netty 编解码器适配器, 最终是通过 ExtensionLoader, 即 SPI 来动态添加 Codec2 的, 可修改默认配置
 */
final public class NettyCodecAdapter {
    private final ChannelHandler encoder = new InternalEncoder();
    private final ChannelHandler decoder = new InternalDecoder();
    private final Codec2 codec;
    private final URL url;
    private final org.apache.dubbo.remoting.ChannelHandler handler;
    public NettyCodecAdapter(Codec2 codec, URL url, org.apache.dubbo.remoting.ChannelHandler handler) {
        this.codec = codec;
        this.url = url;
        this.handler = handler;
    }
    public ChannelHandler getEncoder() {
        return encoder;
    }
    public ChannelHandler getDecoder() {
        return decoder;
    }
    // 编码器
    private class InternalEncoder extends MessageToByteEncoder {
        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
            // 获取 buffer
            org.apache.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
            // 获取 channel
            Channel ch = ctx.channel();
            NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
            try {
                // 对 msg 进行编码
                codec.encode(channel, buffer, msg);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ch);
            }
        }
    }
    // 解码器
    private class InternalDecoder extends ByteToMessageDecoder {
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
            // 获取 buffer
            ChannelBuffer message = new NettyBackedChannelBuffer(input);
            // 获取 channel
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            try {
                // 解码
                do {
                    int saveReaderIndex = message.readerIndex();
                    Object msg = codec.decode(channel, message);
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                        message.readerIndex(saveReaderIndex);
                        break;
                    } else {
                        //is it possible to go here ?
                        if (saveReaderIndex == message.readerIndex()) {
                            throw new IOException("Decode without read data.");
                        }
                        if (msg != null) {
                            out.add(msg);
                        }
                    }
                } while (message.readable());
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }
    }
}

DubboCountCodec --> 默认的编解码器

package org.apache.dubbo.rpc.protocol.dubbo;

import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.Codec2;
import org.apache.dubbo.remoting.buffer.ChannelBuffer;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.support.MultiMessage;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.RpcInvocation;
import java.io.IOException;
import static org.apache.dubbo.rpc.Constants.INPUT_KEY;
import static org.apache.dubbo.rpc.Constants.OUTPUT_KEY;

public final class DubboCountCodec implements Codec2 {
    // 实际的编解码器为 DubboCodec (其 extends ExchangeCodec) 
    private DubboCodec codec = new DubboCodec();
    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        // 编码, 透传至 ExchangeCodec#encode
        codec.encode(channel, buffer, msg);
    }
    // 解码
    @Override
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int save = buffer.readerIndex();
        MultiMessage result = MultiMessage.create();
        do {
            // 这里走进了 ExchangeCodec.decode
            Object obj = codec.decode(channel, buffer);
            if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
                buffer.readerIndex(save);
                break;
            } else {
                result.addMessage(obj);
                logMessageLength(obj, buffer.readerIndex() - save);
                save = buffer.readerIndex();
            }
        } while (true);
        if (result.isEmpty()) {
            return Codec2.DecodeResult.NEED_MORE_INPUT;
        }
        if (result.size() == 1) {
            return result.get(0);
        }
        return result;
    }

    private void logMessageLength(Object result, int bytes) {
        if (bytes <= 0) {
            return;
        }
        if (result instanceof Request) {
            try {
                ((RpcInvocation) ((Request) result).getData()).setAttachment(INPUT_KEY, String.valueOf(bytes));
            } catch (Throwable e) {
                /* ignore */
            }
        } else if (result instanceof Response) {
            try {
                ((AppResponse) ((Response) result).getResult()).setAttachment(OUTPUT_KEY, String.valueOf(bytes));
            } catch (Throwable e) {
                /* ignore */
            }
        }
    }

}

ExchangeCodec#encodeRequest --> 请求编码(响应是另一相似方法)

// 请求编码
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    // 获取序列化方式,默认是Hessian序列化, 这里也是通过 ExtensionLoader 来获取的序列化方式
    Serialization serialization = getSerialization(channel);
    // new 了一个 16 字节(即 128 位)的 byte 数组,就是 request 的消息头
    byte[] header = new byte[HEADER_LENGTH];
    // 往消息头中 set magic number,前 2 个 byte 已经填充
    Bytes.short2bytes(MAGIC, header);
    // 往消息头中 set request and serialization flag, 第三个byte已经填充
    header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
    // 如果请求是 twoWay, 则将 第三个 byte 与 (byte) 0x40 进行 |= 计算
    if (req.isTwoWay()) {
        header[2] |= FLAG_TWOWAY;
    }
    // 如果请求是 event, 则将 第三个 byte 与 (byte) 0x20 进行 |= 计算
    if (req.isEvent()) {
        header[2] |= FLAG_EVENT;
    }

    // set request id.这个时候是 0
    Bytes.long2bytes(req.getId(), header, 4);

    // 编码 request data.
    int savedWriteIndex = buffer.writerIndex();
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    // 序列化数据
    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    if (req.isEvent()) {
        // 编码事件数据
        encodeEventData(channel, out, req.getData());
    } else {
        // 编码消息体数据
        encodeRequestData(channel, out, req.getData(), req.getVersion());
    }
    out.flushBuffer();
    if (out instanceof Cleanable) {
        ((Cleanable) out).cleanup();
    }
    bos.flush();
    bos.close();
    int len = bos.writtenBytes();
    // 校验 payload 是否超过指定长度, 默认为 8 * 1024 * 1024, 可在 URL 中设置 payload, 改变其大小
    checkPayload(channel, len);
    // 在消息头中设置消息体长度
    Bytes.int2bytes(len, header, 12);

    // write
    buffer.writerIndex(savedWriteIndex);
    buffer.writeBytes(header); // write header.
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}

4.4 消费方发起调用过程

http://dubbo.apache.org/zh-cn/docs/source_code_guide/service-invoking-process.html (官网)

// 在此之前, 通过上述4.2 中生成的 proxy0 找到对应地址的 InvokerInvocationHandler
// 这一步, 将请求封装为 RpcInvocation, 调用 recreate 方法封装异步请求结果
// return invoker.invoke(new RpcInvocation(method, args)).recreate();
--> InvokerInvocationHandler#invoke
    // 这里进入集群 (cluster): 判断是否有 mock 请求
    --> MockClusterInvoker#invoke
        // 由于此处无 mock, 请求进入下述流程
        --> AbstractClusterInvoker#invoke
            // 获取所有的 invokers
            --> AbstractClusterInvoker#list
                // 这里进入目录 (Directory) 查找 invokers
                --> AbstractDirectory#list                  
                    // invokers = routerChain.route(getConsumerUrl(), invocation);
                    --> RegistryDirectory#doList
                        // 这里进入路由 (route), 将传入的invokers和设置的路由规则匹配,获得符合条件的invokers返回
                        --> RouterChain#route
                            --> MockInvokersSelector#route
                                --> MockInvokersSelector#getNormalInvokers
                // 这里进入负载均衡 (loadbalance), 通过 ExtensionLoader 加载 SPI 中的 loadbalance
                --> AbstractClusterInvoker#initLoadBalance              
            // 发起调用
            --> FailoverClusterInvoker#doInvoke
                // 通过上一步负载均衡 (loadbalance) 及获取的 invokers, 选取其中一个 invoker, 进行调用
                // 若只有一个 invoker, 则直接返回
                --> AbstractClusterInvoker#select
                    --> AbstractClusterInvoker#doSelect
                        // 这一步真正通过 上述 SPI 配置的 loadbalance 算法, 进行 loadbalance, 返回一个 invoker
                        --> AbstractLoadBalance#select
                --> InvokerWrapper#invoke
                    ... 这里进行各种 filterWrapper, filter 的调用
                        --> ListenerInvokerWrapper#invoke
                            --> AsyncToSyncInvoker#invoke
                                --> AbstractInvoker#invoke
                                    --> DubboInvoker#doInvoke
                                        --> ReferenceCountExchangeClient#request(Object, int)
                                            --> HeaderExchangeClient#request(Object, int)
                                                --> HeaderExchangeChannel#request(Object, int)
                                                    --> AbstractPeer#send
                                                        --> AbstractClient#send
                                                            --> NettyChannel#send
                                                                ... 这里进行 netty4 的 channel.writeAndFlush
client-request.png

调用链

调用链.png

4.4 生产方接收请求及处理过程

http://dubbo.apache.org/zh-cn/docs/source_code_guide/service-invoking-process.html (官网)

调用逻辑
—> NettyServerHandler#channelRead
  —> AbstractPeer#received(Channel, Object)
    —> MultiMessageHandler#received(Channel, Object)
      —> HeartbeatHandler#received(Channel, Object)
        —> AllChannelHandler#received(Channel, Object)
          —> ExecutorService#execute(Runnable)    // 由线程池执行后续的
            —> ChannelEventRunnable#run()
              —> DecodeHandler#received(Channel, Object)
                —> HeaderExchangeHandler#received(Channel, Object)
                  —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
                    —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
                      —> Filter#invoke(Invoker, Invocation)
                        —> AbstractProxyInvoker#invoke(Invocation)
                            —> JavassistProxyFactory#getInvoker
                              —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
                                —> DemoServiceImpl#sayHello(String)

4.4.1 线程派发模型

#概述
Dubbo 将底层通信框架中接收请求的线程称为 IO 线程。
如果一些事件处理逻辑可以很快执行完,比如只在内存打一个标记,
此时直接在 IO 线程上执行该段逻辑即可。
但如果事件的处理逻辑比较耗时,比如该段逻辑会发起数据库查询或者 HTTP 请求。
此时我们就不应该让事件处理逻辑在 IO 线程上执行,而是应该派发到线程池中去执行。
原因也很简单,IO 线程主要用于接收请求,如果 IO 线程被占满,将导致它不能接收新的请求。

#Dispatcher
Dispatcher 真实的职责创建具有线程派发能力的 ChannelHandler,比如 AllChannelHandler、MessageOnlyChannelHandler 和 ExecutionChannelHandler 等,其本身并不具备线程派发能力。Dubbo 支持 5 种不同的线程派发策略
Dubbo调用过程中--Dispatcher线程派发器.png
策略 用途
all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件等
direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行
message 只有请求和响应消息派发到线程池,其它消息均在 IO 线程上执行
execution 只有请求消息派发到线程池,不含响应。其它消息均在 IO 线程上执行
connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池
默认配置下,Dubbo 使用 all 派发策略,即将所有的消息都派发到线程池中。
请求对象会被封装 ChannelEventRunnable 中,ChannelEventRunnable 将会是服务调用过程的新起点。
ChannelEventRunnable 仅是一个中转站,它的 run 方法中并不包含具体的调用逻辑,
仅用于将参数传给其他 ChannelHandler 对象进行处理,该对象类型为 DecodeHandler。
DecodeHandler 主要是包含了一些解码逻辑。
解码完毕后,完全解码后的 Request 对象会继续向后传递,下一站是 HeaderExchangeHandler。

4.6 DefaultFuture 中 FUTURES 保存调用结果 及同步异步问题

http://dubbo.apache.org/zh-cn/docs/source_code_guide/service-invoking-process.html (官网)

#同步异步概述
Dubbo 实现同步和异步调用比较关键的一点就在于由谁调用 ResponseFuture 的 get 方法。
同步调用模式下,由框架自身调用 ResponseFuture 的 get 方法。
异步调用模式下,则由用户调用该方法。
ResponseFuture 是一个接口,它的默认实现类是 DefaultFuture。

当服务消费者还未接收到调用结果时,用户线程调用 get 方法会被阻塞住。
同步调用模式下,框架获得 DefaultFuture 对象后,会立即调用 get 方法进行等待。
而异步模式下则是将该对象封装到 FutureAdapter 实例中,
并将 FutureAdapter 实例设置到 RpcContext 中,供用户使用。
FutureAdapter 是一个适配器,用于将 Dubbo 中的 ResponseFuture 与 JDK 中的 Future 进行适配。
这样当用户线程调用 Future 的 get 方法时,经过 FutureAdapter 适配,
最终会调用 ResponseFuture 实现类对象的 get 方法,也就是 DefaultFuture 的 get 方法。
#异步模式下
一般情况下,服务消费方会并发调用多个服务,每个用户线程发送请求后,
会调用不同 DefaultFuture 对象的 get 方法进行等待。 
一段时间后,服务消费方的线程池会收到多个响应对象。

这个时候要考虑一个问题,如何将每个响应对象传递给相应的 DefaultFuture 对象,且不出错。
答案是通过调用编号。

DefaultFuture 被创建时,会要求传入一个 Request 对象。
此时 DefaultFuture 可从 Request 对象中获取调用编号,
并将 <调用编号, DefaultFuture 对象> 映射关系存入到静态 Map 中,即 FUTURES。
线程池中的线程在收到 Response 对象后,会根据 Response 对象中的调用编号,
到 FUTURES 集合中取出相应的 DefaultFuture 对象,
然后再将 Response 对象设置到 DefaultFuture 对象中。
最后再唤醒用户线程,这样用户线程即可从 DefaultFuture 对象中获取调用结果了。
DefaultFuture保存调用结果.png

5.扩展点分析

5.1 SPI 机制及 @Activate & @Adaptive 应用原理

https://www.jianshu.com/p/43f529637f12

参考资源
https://www.cnblogs.com/simoncook/p/9570535.html (服务查找过程)
https://yq.aliyun.com/articles/69793 (多注册中心配置)
https://www.cnblogs.com/sinxsoft/p/4984321.html (多注册中心配置)
https://blog.csdn.net/qq_27529917/article/details/80632078 (dubbo-zk)

上一篇下一篇

猜你喜欢

热点阅读