Sentinel之集群限流源码分析(二)
集群限流源码分析(二)
接下来分析sentinel-cluster-server-default模块中server包下的内容。
1、源码目录结构
源码结构2、源码分析
分析内容2.1 编解码codec
2.1.1 ServerEntityCodecProvider
编解码提供工具类,用于获取RequestEntityDecoder与ResponseEntityWriter;
源码如下:
public final class ServerEntityCodecProvider {
private static RequestEntityDecoder requestEntityDecoder = null;
private static ResponseEntityWriter responseEntityWriter = null;
static {
resolveInstance();
}
private static void resolveInstance() {
ResponseEntityWriter writer = SpiLoader.loadFirstInstance(ResponseEntityWriter.class);
if (writer == null) {
RecordLog.warn("[ServerEntityCodecProvider] No existing response entity writer, resolve failed");
} else {
responseEntityWriter = writer;
RecordLog.info(
"[ServerEntityCodecProvider] Response entity writer resolved: " + responseEntityWriter.getClass()
.getCanonicalName());
}
RequestEntityDecoder decoder = SpiLoader.loadFirstInstance(RequestEntityDecoder.class);
if (decoder == null) {
RecordLog.warn("[ServerEntityCodecProvider] No existing request entity decoder, resolve failed");
} else {
requestEntityDecoder = decoder;
RecordLog.info(
"[ServerEntityCodecProvider] Request entity decoder resolved: " + requestEntityDecoder.getClass()
.getCanonicalName());
}
}
public static RequestEntityDecoder getRequestEntityDecoder() {
return requestEntityDecoder;
}
public static ResponseEntityWriter getResponseEntityWriter() {
return responseEntityWriter;
}
}
实例变量
- requestEntityDecoder:客户端请求解码器
- responseEntityWriter:服务端响应编码器
静态方法
- resolveInstance:通过SPI机制获取requestEntityDecoder与responseEntityWriter的实现类;并保存在实例变量中。
- requestEntityDecoder与responseEntityWriter实现类分别是DefaultRequestEntityDecoder、DefaultResponseEntityWriter;实现类的指定配置META-INF.services目录下。
2.1.2 DefaultRequestEntityDecoder
默认的ClusterRequest对象解码器
解码格式如下:
- +--------+---------+---------+
- | xid(4) | type(1) | data... |
- +--------+---------+---------+
源码
@Override
public ClusterRequest decode(ByteBuf source) {
//1. 判断可读的字节数是否大于5个字节
if (source.readableBytes() >= 5) {
//2. 获取xid:4个字节
int xid = source.readInt();
//3. 获取type 1个字节;三种类型:PING、FLOW、PARAM_FLOW:下文会讲
int type = source.readByte();
//4. 从注册器中获取具体类型的解码器
EntityDecoder<ByteBuf, ?> dataDecoder = RequestDataDecodeRegistry.getDecoder(type);
if (dataDecoder == null) {
RecordLog.warn("Unknown type of request data decoder: {0}", type);
return null;
}
Object data;
//5. 再次确认下可读取的字节数
if (source.readableBytes() == 0) {
data = null;
} else {
// 6. 读取数据
data = dataDecoder.decode(source);
}
return new ClusterRequest<>(xid, type, data);
}
return null;
}
2.1.3 DefaultResponseEntityWriter
默认的ClusterResponse响应编码器
源码
@Override
public void writeTo(ClusterResponse response, ByteBuf out) {
//1. 获取响应类型及具体的响应数据编码器
int type = response.getType();
EntityWriter<Object, ByteBuf> responseDataWriter = ResponseDataWriterRegistrygetWriter(type);
if (responseDataWriter == null) {
writeHead(response.setStatus(ClusterConstants.RESPONSE_STATUS_BAD), out);
RecordLog.warn("[NettyResponseEncoder] Cannot find matching writer for type <{0>", response.getType());
return;
}
// 2. 写入头部数据
writeHead(response, out);
// 3. 写入数据包
responseDataWriter.writeTo(response.getData(), out);
}
private void writeHead(Response response, ByteBuf out) {
out.writeInt(response.getId());
out.writeByte(response.getType());
//状态,如错误状态会写入
out.writeByte(response.getStatus());
}
2.1.3 registry
- RequestDataDecodeRegistry:RequestData解码注册器
- ResponseDataWriterRegistry:ResponseData编码注册器
源码
public final class RequestDataDecodeRegistry {
//保存EntityDecoder的Map,key是类型type
private static final Map<Integer, EntityDecoder<ByteBuf, ?>> DECODER_MAP = new HashMap<>();
public static boolean addDecoder(int type, EntityDecoder<ByteBuf, ?> decoder) {
if (DECODER_MAP.containsKey(type)) {
return false;
}
DECODER_MAP.put(type, decoder);
return true;
}
public static EntityDecoder<ByteBuf, Object> getDecoder(int type) {
return (EntityDecoder<ByteBuf, Object>)DECODER_MAP.get(type);
}
public static boolean removeDecoder(int type) {
return DECODER_MAP.remove(type) != null;
}
}
public final class ResponseDataWriterRegistry {
//保存EntityWriter的Map,key是类型type
private static final Map<Integer, EntityWriter<Object, ByteBuf>> WRITER_MAP = new HashMap<>();
public static <T> boolean addWriter(int type, EntityWriter<T, ByteBuf> writer) {
if (WRITER_MAP.containsKey(type)) {
return false;
}
WRITER_MAP.put(type, (EntityWriter<Object, ByteBuf>)writer);
return true;
}
public static EntityWriter<Object, ByteBuf> getWriter(int type) {
return WRITER_MAP.get(type);
}
public static boolean remove(int type) {
return WRITER_MAP.remove(type) != null;
}
}
源码很简单,可以理解就是一个工具类;那么Map的中编码、解码器数据在哪个地方设置的呢;在后面的init包中源码会分析。
2.1.4 netty
这个包下其实就是handler类了,会放在ChannelPipeline中;在NettyTransportServer启动类中可以看到。
源码分析
public class NettyRequestDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//1. 获取RequestEntityDecoder
RequestEntityDecoder<ByteBuf, Request> requestDecoder = ServerEntityCodecProvider.getRequestEntityDecoder();
if (requestDecoder == null) {
RecordLog.warn("[NettyRequestDecoder] Cannot resolve the global request entity decoder, "
+ "dropping the request");
return;
}
// TODO: handle decode error here.
// 2. 请求数据解码
Request request = requestDecoder.decode(in);
if (request != null) {
out.add(request);
}
}
}
public class NettyResponseEncoder extends MessageToByteEncoder<ClusterResponse> {
@Override
protected void encode(ChannelHandlerContext ctx, ClusterResponse response, ByteBuf out) throws Exception {
//1. 获取ResponseEntityWriter
ResponseEntityWriter<ClusterResponse, ByteBuf> responseEntityWriter = ServerEntityCodecProvider.getResponseEntityWriter();
if (responseEntityWriter == null) {
RecordLog.warn("[NettyResponseEncoder] Cannot resolve the global response entity writer, reply bad status");
//写入错误状态数据
writeBadStatusHead(response, out);
return;
}
//2. 写入数据
responseEntityWriter.writeTo(response, out);
}
private void writeBadStatusHead(Response response, ByteBuf out) {
out.writeInt(response.getId());
out.writeByte(ClusterConstants.RESPONSE_STATUS_BAD);
out.writeByte(response.getStatus());
}
}
- NettyRequestDecoder类继承ByteToMessageDecoder,而MessageToByteEncoder又继承ChannelInboundHandlerAdapter,熟悉netty的知道,通过继承ChannelInboundHandlerAdapter可以自定义拦截器
- NettyResponseEncoder继承MessageToByteEncoder,MessageToByteEncoder继承ChannelOutboundHandlerAdapter
2.1.5 data
该包下就是EntityWriter和EntityDecoder的实现类了;其中EntityDecoder的子类有FlowRequestDataDecoder,ParamFlowRequestDataDecoder,PingRequestDataDecoder;EntityWriter的子类有FlowResponseDataWriter,PingResponseDataWriter。
2.1.5.1 对请求数据解码
主要可以看到FlowRequestDataDecoder、ParamFlowRequestDataDecoder、PingRequestDataDecoder类
对ClusterResponse响应的数据进行解码。
- FlowRequestDataDecoder
通用限流响应数据解码器,解码后对象FlowRequestData,配合FlowRequestDataWriter查看
| flow ID (8) | count (4) | priority flag (1) |
源码
public class FlowRequestDataDecoder implements EntityDecoder<ByteBuf, FlowRequestData> {
@Override
public FlowRequestData decode(ByteBuf source) {
// 字节数判断,需要大于12
if (source.readableBytes() >= 12) {
FlowRequestData requestData = new FlowRequestData()
.setFlowId(source.readLong())
.setCount(source.readInt());
//判断是否有priority属性
if (source.readableBytes() >= 1) {
requestData.setPriority(source.readBoolean());
}
return requestData;
}
return null;
}
}
- ParamFlowRequestDataDecoder
热点参数解码器,解码后的对象是ParamFlowRequestData,配合ParamFlowRequestDataWriter查看
| flow ID (8) | count (4) | param count (4) |
源码
public class ParamFlowRequestDataDecoder implements EntityDecoder<ByteBuf, ParamFlowRequestData> {
@Override
public ParamFlowRequestData decode(ByteBuf source) {
//读取16个字符
//| flow ID (8) | count (4) | param count (4) |
if (source.readableBytes() >= 16) {
ParamFlowRequestData requestData = new ParamFlowRequestData()
.setFlowId(source.readLong())
.setCount(source.readInt());
//热点参数个数
int amount = source.readInt();
if (amount > 0) {
List<Object> params = new ArrayList<>(amount);
for (int i = 0; i < amount; i++) {
//解析热点参数
decodeParam(source, params);
}
requestData.setParams(params);
return requestData;
}
}
return null;
}
private boolean decodeParam(ByteBuf source, List<Object> params) {
//热点参数类型
byte paramType = source.readByte();
switch (paramType) {
//int
case ClusterConstants.PARAM_TYPE_INTEGER:
params.add(source.readInt());
return true;
//string
case ClusterConstants.PARAM_TYPE_STRING:
//先读取字符长度
int length = source.readInt();
byte[] bytes = new byte[length];
//读取字符
source.readBytes(bytes);
// TODO: take care of charset?
params.add(new String(bytes));
return true;
//boolean
case ClusterConstants.PARAM_TYPE_BOOLEAN:
params.add(source.readBoolean());
return true;
//double
case ClusterConstants.PARAM_TYPE_DOUBLE:
params.add(source.readDouble());
return true;
//long
case ClusterConstants.PARAM_TYPE_LONG:
params.add(source.readLong());
return true;
//float
case ClusterConstants.PARAM_TYPE_FLOAT:
params.add(source.readFloat());
return true;
//byte
case ClusterConstants.PARAM_TYPE_BYTE:
params.add(source.readByte());
return true;
//short
case ClusterConstants.PARAM_TYPE_SHORT:
params.add(source.readShort());
return true;
default:
return false;
}
}
}
- PingRequestDataDecoder
测试数据
2.1.5.2 响应数据编码
有FlowResponseDataWriter、PingRequestDataWriter
- FlowResponseDataWriter:Server端响应编码器,需要配合FlowResponseDataDecoder查看
源码
public class FlowResponseDataWriter implements EntityWriter<FlowTokenResponseData, ByteBuf> {
@Override
public void writeTo(FlowTokenResponseData entity, ByteBuf out) {
//剩下的数据
out.writeInt(entity.getRemainingCount());
//等待时间
out.writeInt(entity.getWaitInMs());
}
}
- PingRequestDataWriter
测试数据
2.2 配置config
这个包下主要是server的一些配置,包括常量配置、动态配置等。
2.2.1 ServerTransportConfigObserver接口
源码
public interface ServerTransportConfigObserver {
/**
* Callback on server transport config (e.g. port) change.
*
* @param config new server transport config
*/
//定义了一个回调方法,用于传输配置变更时通知
void onTransportConfigChange(ServerTransportConfig config);
}
2.2.2 ServerTransportConfig
实例变量
- port:端口
- idleSeconds:活跃时间
2.2.3 ServerFlowConfig
服务流控规则配置及默认值,看源码
源码
public static final double DEFAULT_EXCEED_COUNT = 1.0d;
public static final double DEFAULT_MAX_OCCUPY_RATIO = 1.0d;
public static final int DEFAULT_INTERVAL_MS = 1000;
public static final int DEFAULT_SAMPLE_COUNT= 10;
public static final double DEFAULT_MAX_ALLOWED_QPS= 30000;
private final String namespace;
//超过数
private double exceedCount = DEFAULT_EXCEED_COUNT;
//最大比例
private double maxOccupyRatio = DEFAULT_MAX_OCCUPY_RATIO;
//间隔ms
private int intervalMs = DEFAULT_INTERVAL_MS;
//采样个数
private int sampleCount = DEFAULT_SAMPLE_COUNT;
//最大允许qps
private double maxAllowedQps = DEFAULT_MAX_ALLOWED_QPS;
2.2.4 ClusterServerConfigManager
集群流控配置管理器
实例变量并赋值默认值
private static boolean embedded = false;
/**
* Server global transport and scope config.
* 全局的服务端传送配置
*/
private static volatile int port = ClusterConstants.DEFAULT_CLUSTER_SERVER_PORT;
private static volatile int idleSeconds = ServerTransportConfig.DEFAULT_IDLE_SECONDS;
private static volatile Set<String> namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE);
/**
* Server global flow config.
* 服务端流控配置
*/
private static volatile double exceedCount = ServerFlowConfig.DEFAULT_EXCEED_COUNT;
private static volatile double maxOccupyRatio = ServerFlowConfig.DEFAULT_MAX_OCCUPY_RATIO;
private static volatile int intervalMs = ServerFlowConfig.DEFAULT_INTERVAL_MS;
private static volatile int sampleCount = ServerFlowConfig.DEFAULT_SAMPLE_COUNT;
private static volatile double maxAllowedQps = ServerFlowConfig.DEFAULT_MAX_ALLOWED_QPS;
动态配置初始化
/**
* Namespace-specific flow config for token server.
* Format: (namespace, config).
*/
//token server的命名配置
private static final Map<String, ServerFlowConfig> NAMESPACE_CONF = new ConcurrentHashMap<>();
//服务端Transport观察者
private static final List<ServerTransportConfigObserver> TRANSPORT_CONFIG_OBSERVERS = new ArrayList<>();
/**
* Property for cluster server global transport configuration.
*/
//传输动态配置
private static SentinelProperty<ServerTransportConfig> transportConfigProperty = new DynamicSentinelProperty<>();
/**
* Property for cluster server namespace set.
*/
//namespace动态配置
private static SentinelProperty<Set<String>> namespaceSetProperty = new DynamicSentinelProperty<>();
/**
* Property for cluster server global flow control configuration.
*/
//流控规则动态配置
private static SentinelProperty<ServerFlowConfig> globalFlowProperty = new DynamicSentinelProperty<>();
//配置监听者
private static final PropertyListener<ServerTransportConfig> TRANSPORT_PROPERTY_LISTENER
= new ServerGlobalTransportPropertyListener();
private static final PropertyListener<ServerFlowConfig> GLOBAL_FLOW_PROPERTY_LISTENER
= new ServerGlobalFlowPropertyListener();
private static final PropertyListener<Set<String>> NAMESPACE_SET_PROPERTY_LISTENER
= new ServerNamespaceSetPropertyListener();
//启动时加载,增加监听器
static {
transportConfigProperty.addListener(TRANSPORT_PROPERTY_LISTENER);
globalFlowProperty.addListener(GLOBAL_FLOW_PROPERTY_LISTENER);
namespaceSetProperty.addListener(NAMESPACE_SET_PROPERTY_LISTENER);
}
//省略部分代码
//增加传送变更配置变更者
public static void addTransportConfigChangeObserver(ServerTransportConfigObserver observer) {
AssertUtil.notNull(observer, "observer cannot be null");
TRANSPORT_CONFIG_OBSERVERS.add(observer);
}
监听器内部类
//namespace 监听器
private static class ServerNamespaceSetPropertyListener implements PropertyListener<Set<String>> {
@Override
public synchronized void configLoad(Set<String> set) {
if (set == null || set.isEmpty()) {
RecordLog.warn("[ClusterServerConfigManager] WARN: empty initial server namespace set");
return;
}
//更新
applyNamespaceSetChange(set);
}
@Override
public synchronized void configUpdate(Set<String> set) {
// TODO: should debounce?
applyNamespaceSetChange(set);
}
}
private static void applyNamespaceSetChange(Set<String> newSet) {
if (newSet == null) {
return;
}
RecordLog.info("[ClusterServerConfigManager] Server namespace set will be update to: " + newSet);
if (newSet.isEmpty()) {
ClusterServerConfigManager.namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE);
return;
}
newSet = new HashSet<>(newSet);
// Always add the `default` namespace to the namespace set.
newSet.add(ServerConstants.DEFAULT_NAMESPACE);
if (embedded) {
// In embedded server mode, the server itself is also a part of service,
// so it should be added to namespace set.
// By default, the added namespace is the appName.
//嵌入模式也需要增加
newSet.add(ConfigSupplierRegistry.getNamespaceSupplier().get());
}
//更新
Set<String> oldSet = ClusterServerConfigManager.namespaceSet;
if (oldSet != null && !oldSet.isEmpty()) {
for (String ns : oldSet) {
// Remove the cluster rule property for deprecated namespace set.
if (!newSet.contains(ns)) {
ClusterFlowRuleManager.removeProperty(ns);
ClusterParamFlowRuleManager.removeProperty(ns);
}
}
}
ClusterServerConfigManager.namespaceSet = newSet;
//注册规则属性,在后面会讲解
for (String ns : newSet) {
// Register the rule property if needed.
ClusterFlowRuleManager.registerPropertyIfAbsent(ns);
ClusterParamFlowRuleManager.registerPropertyIfAbsent(ns);
// Initialize the global QPS limiter for the namespace.
GlobalRequestLimiter.initIfAbsent(ns);
}
}
//globaTransport监听器
private static class ServerGlobalTransportPropertyListener implements PropertyListener<ServerTransportConfig> {
@Override
public void configLoad(ServerTransportConfig config) {
if (config == null) {
RecordLog.warn("[ClusterServerConfigManager] Empty initial server transport config");
return;
}
applyConfig(config);
}
@Override
public void configUpdate(ServerTransportConfig config) {
//应用配置
applyConfig(config);
}
private synchronized void applyConfig(ServerTransportConfig config) {
//校验配置
if (!isValidTransportConfig(config)) {
RecordLog.warn(
"[ClusterServerConfigManager] Invalid cluster server transport config, ignoring: " + config);
return;
}
RecordLog.info("[ClusterServerConfigManager] Updating new server transport config: " + config);
if (config.getIdleSeconds() != idleSeconds) {
idleSeconds = config.getIdleSeconds();
}
//更新server的token
updateTokenServer(config);
}
}
private static void updateTokenServer(ServerTransportConfig config) {
int newPort = config.getPort();
AssertUtil.isTrue(newPort > 0, "token server port should be valid (positive)");
if (newPort == port) {
return;
}
ClusterServerConfigManager.port = newPort;
for (ServerTransportConfigObserver observer : TRANSPORT_CONFIG_OBSERVERS) {
//更新,通过观察者模式更新
observer.onTransportConfigChange(config);
}
}
//globalFlow流控监听器
private static class ServerGlobalFlowPropertyListener implements PropertyListener<ServerFlowConfig> {
@Override
public void configUpdate(ServerFlowConfig config) {
//更新
applyGlobalFlowConfig(config);
}
@Override
public void configLoad(ServerFlowConfig config) {
applyGlobalFlowConfig(config);
}
private synchronized void applyGlobalFlowConfig(ServerFlowConfig config) {
//校验规则
if (!isValidFlowConfig(config)) {
RecordLog.warn(
"[ClusterServerConfigManager] Invalid cluster server global flow config, ignoring: " + config);
return;
}
RecordLog.info("[ClusterServerConfigManager] Updating new server global flow config: " + config);
//判断有没有更新
if (config.getExceedCount() != exceedCount) {
exceedCount = config.getExceedCount();
}
if (config.getMaxOccupyRatio() != maxOccupyRatio) {
maxOccupyRatio = config.getMaxOccupyRatio();
}
if (config.getMaxAllowedQps() != maxAllowedQps) {
maxAllowedQps = config.getMaxAllowedQps();
//调用GlobalRequestLimiter设置qps变更
GlobalRequestLimiter.applyMaxQpsChange(maxAllowedQps);
}
int newIntervalMs = config.getIntervalMs();
int newSampleCount = config.getSampleCount();
if (newIntervalMs != intervalMs || newSampleCount != sampleCount) {
if (newIntervalMs <= 0 || newSampleCount <= 0 || newIntervalMs % newSampleCount != 0) {
RecordLog.warn("[ClusterServerConfigManager] Ignoring invalid flow interval or sample count");
} else {
intervalMs = newIntervalMs;
sampleCount = newSampleCount;
// Reset all the metrics.
//重置统计指标
ClusterMetricStatistics.resetFlowMetrics();
ClusterParamMetricStatistics.resetFlowMetrics();
}
}
}
}
public static boolean isValidTransportConfig(ServerTransportConfig config) {
return config != null && config.getPort() > 0 && config.getPort() <= 65535;
}
public static boolean isValidFlowConfig(ServerFlowConfig config) {
return config != null && config.getMaxOccupyRatio() >= 0 && config.getExceedCount() >= 0
&& config.getMaxAllowedQps() >= 0
&& FlowRuleUtil.isWindowConfigValid(config.getSampleCount(), config.getIntervalMs());
}
2.3 连接器connection
这个包下主要就是集群链接器、连接器组的管理。
2.3.1 Connection接口
源码
public interface Connection extends AutoCloseable {
//获取SocketAddress
SocketAddress getLocalAddress();
//port
int getRemotePort();
//ip
String getRemoteIP();
//刷新readTime
void refreshLastReadTime(long lastReadTime);
//获取lastReadTime
long getLastReadTime();
//获取链接的key
String getConnectionKey();
}
Connection继承AutoCloseable接口后就可以自动释放资源了,JDK中的文件流操作在1.7版本后也实现了。
定义了6个方法,在子类实现。
2.3.2 NettyConnection
实例变量
- remoteIp:远程ip
- remotePort:远程port
- channel:渠道Channel
- lastReadTime:上传刷新事件
- pool:链接池
源码
//Netty链接器
public class NettyConnection implements Connection {
private String remoteIp;
private int remotePort;
private Channel channel;
private long lastReadTime;
private ConnectionPool pool;
//构造器,需要传入channel以及pool;ConnectionPool下面会说
public NettyConnection(Channel channel, ConnectionPool pool) {
this.channel = channel;
this.pool = pool;
//获取socketAddress
InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
this.remoteIp = socketAddress.getAddress().getHostAddress();
this.remotePort = socketAddress.getPort();
this.lastReadTime = System.currentTimeMillis();
}
@Override
public SocketAddress getLocalAddress() {
return channel.localAddress();
}
//省略部分代码
@Override
public String getConnectionKey() {
//ip:port
return remoteIp + ":" + remotePort;
}
@Override
//实现了AutoCloseable的close方法,可以自动关闭资源
public void close() {
// Remove from connection pool.
pool.remove(channel);
// Close the connection.
if (channel != null && channel.isActive()){
channel.close();
}
}
}
2.3.3 ConnectionPool
通用连接池连接管理。
实例变量
- TIMER:初始化了一个可定时执行的线程执行器,核心线程2个
- CONNECTION_MAP:链接器保存对象:Format: ("ip:port", connection)
- scanTaskFuture:定期扫描任务
源码
public class ConnectionPool {
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static final ScheduledExecutorService TIMER = Executors.newScheduledThreadPool(2);
/**
* Format: ("ip:port", connection)
*/
private final Map<String, Connection> CONNECTION_MAP = new ConcurrentHashMap<String, Connection>();
/**
* Periodic scan task.
*/
private ScheduledFuture scanTaskFuture = null;
//创建连接器
public void createConnection(Channel channel) {
if (channel != null) {
//通过构造方法创建
Connection connection = new NettyConnection(channel, this);
//获取connKey并保存在CONNECTION_MAP中
String connKey = getConnectionKey(channel);
CONNECTION_MAP.put(connKey, connection);
}
}
/**
* Start the scan task for long-idle connections.
*/
//启动一个定时任务,定时任务下面讲解
private synchronized void startScan() {
if (scanTaskFuture == null
|| scanTaskFuture.isCancelled()
|| scanTaskFuture.isDone()) {
scanTaskFuture = TIMER.scheduleAtFixedRate(
new ScanIdleConnectionTask(this), 10, 30, TimeUnit.SECONDS);
}
}
/**
* Format to "ip:port".
*
* @param channel channel
* @return formatted key
*/
private String getConnectionKey(Channel channel) {
InetSocketAddress socketAddress = (InetSocketAddress)channel.remoteAddress();
String remoteIp = socketAddress.getAddress().getHostAddress();
int remotePort = socketAddress.getPort();
return remoteIp + ":" + remotePort;
}
private String getConnectionKey(String ip, int port) {
return ip + ":" + port;
}
//刷新readTime,readTime为当前时间
public void refreshLastReadTime(Channel channel) {
if (channel != null) {
String connKey = getConnectionKey(channel);
Connection connection = CONNECTION_MAP.get(connKey);
if (connection != null) {
connection.refreshLastReadTime(System.currentTimeMillis());
}
}
}
//获取链接,直接从map中获取
public Connection getConnection(String remoteIp, int remotePort) {
String connKey = getConnectionKey(remoteIp, remotePort);
return CONNECTION_MAP.get(connKey);
}
public void remove(Channel channel) {
String connKey = getConnectionKey(channel);
CONNECTION_MAP.remove(connKey);
}
public List<Connection> listAllConnection() {
List<Connection> connections = new ArrayList<Connection>(CONNECTION_MAP.values());
return connections;
}
public int count() {
return CONNECTION_MAP.size();
}
public void clear() {
CONNECTION_MAP.clear();
}
//shoudownAll
public void shutdownAll() throws Exception {
for (Connection c : CONNECTION_MAP.values()) {
c.close();
}
}
//刷新定时任务
public void refreshIdleTask() {
if (scanTaskFuture == null || scanTaskFuture.cancel(false)) {
startScan();
} else {
RecordLog.info("The result of canceling scanTask is error.");
}
}
}
2.3.4 ScanIdleConnectionTask
ScanIdleConnectionTask是定时任务执行实现了Runnable的方法。
源码
public class ScanIdleConnectionTask implements Runnable {
private final ConnectionPool connectionPool;
//构造方法
public ScanIdleConnectionTask(ConnectionPool connectionPool) {
this.connectionPool = connectionPool;
}
@Override
public void run() {
try {
//获取链接获取时间,默认600s
int idleSeconds = ClusterServerConfigManager.getIdleSeconds();
long idleTimeMillis = idleSeconds * 1000;
if (idleTimeMillis < 0) {
idleTimeMillis = ServerTransportConfig.DEFAULT_IDLE_SECONDS * 1000;
}
long now = System.currentTimeMillis();
//拿到所有的连接
List<Connection> connections = connectionPool.listAllConnection();
for (Connection conn : connections) {
//如果当前时间-上次readTime大于活跃时间,说明链接可以关闭了
if ((now - conn.getLastReadTime()) > idleTimeMillis) {
RecordLog.info(
String.format("[ScanIdleConnectionTask] The connection <%s:%d> has been idle for <%d>s. "
+ "It will be closed now.", conn.getRemoteIP(), conn.getRemotePort(), idleSeconds)
);
//关闭链接
conn.close();
}
}
} catch (Throwable t) {
RecordLog.warn("[ScanIdleConnectionTask] Failed to clean-up idle tasks", t);
}
}
}
2.3.5 连接组ConnectionGroup
连接组可以理解就是连接connection的集合。
实例变量
- namespace:命名空间
- connectionSet:ConnectionDescriptor的集合,ConnectionDescriptor的实例变量是address、host。
- connectedCount:连接次数,定义的是AtomicInteger类型
源码分析
public class ConnectionGroup {
private final String namespace;
private final Set<ConnectionDescriptor> connectionSet = Collections.synchronizedSet(new HashSet<ConnectionDescriptor>());
private final AtomicInteger connectedCount = new AtomicInteger();
//带namespace构造函数
public ConnectionGroup(String namespace) {
AssertUtil.notEmpty(namespace, "namespace cannot be empty");
this.namespace = namespace;
}
//无参构造函数
public ConnectionGroup() {
this(ServerConstants.DEFAULT_NAMESPACE);
}
//增加连接
public ConnectionGroup addConnection(String address) {
AssertUtil.notEmpty(address, "address cannot be empty");
//获取host,若是ip:port形式,就只需要ip
String[] ip = address.split(":");
String host;
if (ip != null && ip.length >= 1) {
host = ip[0];
} else {
host = address;
}
//已经重写的equals、hashCode方法
boolean newAdded = connectionSet.add(new ConnectionDescriptor().setAddress(address).setHost(host));
//增加成功,连接数加1
if (newAdded) {
connectedCount.incrementAndGet();
}
return this;
}
//移除连接
public ConnectionGroup removeConnection(String address) {
AssertUtil.notEmpty(address, "address cannot be empty");
if (connectionSet.remove(new ConnectionDescriptor().setAddress(address))) {
connectedCount.decrementAndGet();
}
return this;
}
}
2.3.6 连接管理ConnectionManager
顾名思义ConnectionManager是对连接的管
实例变量
- CONN_MAP:Connection map (namespace, connection).
- NAMESPACE_MAP:namespace map (address, namespace).
源码分析
public final class ConnectionManager {
/**
* Connection map (namespace, connection).
*/
private static final Map<String, ConnectionGroup> CONN_MAP = new ConcurrentHashMap<>();
/**
* namespace map (address, namespace).
*/
private static final Map<String, String> NAMESPACE_MAP = new ConcurrentHashMap<>();
/**
* Get connected count for specific namespace.
*
* @param namespace namespace to check
* @return connected count for specific namespace
*/
//获取namespace的连接数
public static int getConnectedCount(String namespace) {
AssertUtil.notEmpty(namespace, "namespace should not be empty");
ConnectionGroup group = CONN_MAP.get(namespace);
return group == null ? 0 : group.getConnectedCount();
}
//查询获取创建连接组,注意:连接没有创建
public static ConnectionGroup getOrCreateGroup(String namespace) {
AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
ConnectionGroup group = CONN_MAP.get(namespace);
if (group == null) {
//synchronized锁住,防止并发问题
synchronized (CREATE_LOCK) {
if ((group = CONN_MAP.get(namespace)) == null) {
//创建并保存在CONN_MAP方便获取
group = new ConnectionGroup(namespace);
CONN_MAP.put(namespace, group);
}
}
}
return group;
}
//移除连接
public static void removeConnection(String address) {
AssertUtil.assertNotBlank(address, "address should not be empty");
String namespace = NAMESPACE_MAP.get(address);
if (namespace != null) {
ConnectionGroup group = CONN_MAP.get(namespace);
if (group == null) {
return;
}
//调用ConnectionGroup的方法,上面讲解过,其实就是移除保存在set里面的方法,
group.removeConnection(address);
RecordLog.info("[ConnectionManager] Client <{0}> disconnected and removed from namespace <{1}>", address, namespace);
}
//map中移除
NAMESPACE_MAP.remove(address);
}
//传入两个参数移除,少走一步获取namespace
public static void removeConnection(String namespace, String address) {
AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
AssertUtil.assertNotBlank(address, "address should not be empty");
ConnectionGroup group = CONN_MAP.get(namespace);
if (group == null) {
return;
}
group.removeConnection(address);
NAMESPACE_MAP.remove(address);
RecordLog.info("[ConnectionManager] Client <{0}> disconnected and removed from namespace <{1}>", address, namespace);
}
//增加连接,有address参数,需要把address增加到NAMESPACE_MAP map中,并增加连接次数
public static ConnectionGroup addConnection(String namespace, String address) {
AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
AssertUtil.assertNotBlank(address, "address should not be empty");
ConnectionGroup group = getOrCreateGroup(namespace);
group.addConnection(address);
NAMESPACE_MAP.put(address, namespace);
RecordLog.info("[ConnectionManager] Client <{0}> registered with namespace <{1}>", address, namespace);
return group;
}
//增加连接
public static ConnectionGroup getOrCreateConnectionGroup(String namespace) {
AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
ConnectionGroup group = getOrCreateGroup(namespace);
return group;
}
// 拿到连接组
public static ConnectionGroup getConnectionGroup(String namespace) {
AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
ConnectionGroup group = CONN_MAP.get(namespace);
return group;
}
static void clear() {
CONN_MAP.clear();
NAMESPACE_MAP.clear();
}
private static final Object CREATE_LOCK = new Object();
private ConnectionManager() {}
}
2.4 处理器processor
这个包下主要是流控的处理器,分别有普通限流和热点限流处理器
2.4.1 接口RequestProcessor
public interface RequestProcessor<T, R> {
/**
* Process the cluster request.
*
* @param request Sentinel cluster request
* @return the response after processed
*/
//处理请求
//有两个实现类,分别是FlowRequestProcessor,ParamFlowRequestProcessor
ClusterResponse<R> processRequest(ClusterRequest<T> request);
}
2.4.2 RequestProcessorProvider
请求流控提供者,类似于工厂类
实例变量
- PROCESSOR_MAP:请求类型对应的请求处理器
- SERVICE_LOADER:通过ServiceLoader加载流控实现类
源码分析
public final class RequestProcessorProvider {
private static final Map<Integer, RequestProcessor> PROCESSOR_MAP = new ConcurrentHashMap<>();
//默认配置的实现类有FlowRequestProcessor,ParamFlowRequestProcessor
private static final ServiceLoader<RequestProcessor> SERVICE_LOADER = ServiceLoaderUtil.getServiceLoader(
RequestProcessor.class);
//静态代码快,类启动是会加载
static {
loadAndInit();
}
private static void loadAndInit() {
for (RequestProcessor processor : SERVICE_LOADER) {
Integer type = parseRequestType(processor);
if (type != null) {
//放入map中
PROCESSOR_MAP.put(type, processor);
}
}
}
//获取RequestType
private static Integer parseRequestType(RequestProcessor processor) {
//配置在注解上
RequestType requestType = processor.getClass().getAnnotation(RequestType.class);
if (requestType != null) {
return requestType.value();
} else {
return null;
}
}
//获取RequestProcessor
public static RequestProcessor getProcessor(int type) {
return PROCESSOR_MAP.get(type);
}
static void addProcessorIfAbsent(int type, RequestProcessor processor) {
// TBD: use putIfAbsent in JDK 1.8.
if (PROCESSOR_MAP.containsKey(type)) {
return;
}
PROCESSOR_MAP.put(type, processor);
}
static void addProcessor(int type, RequestProcessor processor) {
AssertUtil.notNull(processor, "processor cannot be null");
PROCESSOR_MAP.put(type, processor);
}
private RequestProcessorProvider() {}
}
2.4.3 FlowRequestProcessor
源码分析
@RequestType(ClusterConstants.MSG_TYPE_FLOW)
public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {
@Override
public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {
//获取TokenService,配置的是DefaultTokenService
TokenService tokenService = TokenServiceProvider.getService();
long flowId = request.getData().getFlowId();
int count = request.getData().getCount();
boolean prioritized = request.getData().isPriority();
//获取请求token
TokenResult result = tokenService.requestToken(flowId, count, prioritized);
//解析响应结果
return toResponse(result, request);
}
private ClusterResponse<FlowTokenResponseData> toResponse(TokenResult result, ClusterRequest request) {
return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(),
new FlowTokenResponseData()
.setRemainingCount(result.getRemaining())
.setWaitInMs(result.getWaitInMs())
);
}
}
2.4.4 ParamFlowRequestProcessor
源码分析
@RequestType(ClusterConstants.MSG_TYPE_PARAM_FLOW)
public class ParamFlowRequestProcessor implements RequestProcessor<ParamFlowRequestData, FlowTokenResponseData> {
@Override
public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<ParamFlowRequestData> request) {
TokenService tokenService = TokenServiceProvider.getService();
long flowId = request.getData().getFlowId();
int count = request.getData().getCount();
Collection<Object> args = request.getData().getParams();
//请求热点参数
TokenResult result = tokenService.requestParamToken(flowId, count, args);
//流控接口解析
return toResponse(result, request);
}
private ClusterResponse<FlowTokenResponseData> toResponse(TokenResult result, ClusterRequest request) {
return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(),
new FlowTokenResponseData()
.setRemainingCount(result.getRemaining())
.setWaitInMs(0)
);
}
}
流控解析,下一面讲解。
2.5 启动加载器DefaultClusterServerInitFunc
源码
//实现了InitFunc,InitFunc会在系统启动时加载
public class DefaultClusterServerInitFunc implements InitFunc {
@Override
public void init() throws Exception {
//初始化Decoders
initDefaultEntityDecoders();
//初始化wriders
initDefaultEntityWriters();
//初始化processors
initDefaultProcessors();
// Eagerly-trigger the SPI pre-load of token service.
// 这个时候就把TokenService加载好了
TokenServiceProvider.getService();
RecordLog.info("[DefaultClusterServerInitFunc] Default entity codec and processors registered");
}
private void initDefaultEntityWriters() {
//ping
ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_PING, new PingResponseDataWriter());
//流控Writer
ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_FLOW, new FlowResponseDataWriter());
//热点参数Writer和Flow一样
ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_PARAM_FLOW, new FlowResponseDataWriter());
}
private void initDefaultEntityDecoders() {
//ping
RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_PING, new PingRequestDataDecoder());
//普通Flow
RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_FLOW, new FlowRequestDataDecoder());
//热点参数
RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_PARAM_FLOW, new ParamFlowRequestDataDecoder());
}
private void initDefaultProcessors() {
// Eagerly-trigger the SPI pre-load.
//获取默认,实际上就是加载了,这个类在上面已经讲过了
RequestProcessorProvider.getProcessor(0);
}
}
2.6 TokenServiceProvider
类似于RequestProcessor,可以了解到sentinel的源码作者,习惯于使用Provider作为一个工厂使用。
源码
public final class TokenServiceProvider {
private static TokenService service = null;
static {
resolveTokenServiceSpi();
}
public static TokenService getService() {
return service;
}
private static void resolveTokenServiceSpi() {
//加载TokenServeice,若不存在的则使用默认的DefalutTokenService,这里使用的就是默认的
service = SpiLoader.loadFirstInstanceOrDefault(TokenService.class, DefaultTokenService.class);
if (service != null) {
RecordLog.info("[TokenServiceProvider] Global token service resolved: "
+ service.getClass().getCanonicalName());
} else {
RecordLog.warn("[TokenServiceProvider] Unable to resolve TokenService: no SPI found");
}
}
}
3、其他内容
- ClusterTokenServer:TokenServer相关的在后面与client关联时再讲解,这边有NettyTransportServer、SentinelDefaultTokenServer、DefaultEmbeddedTokenServer类,以及handler包下的TokenServerHandler。
- log下的ClusterServerStatLogUtil主要用于集群流控记录日志的
- command.handler包下类主要用于管理中心动态配置的查询与变更的交互:如流控规则、监控指标、集群配置等