Sentinel Dashboard和应用通信
2021-06-26 本文已影响0人
晴天哥_王志
系列
- Sentinel流程介绍
- Sentinel资源节点树构成
- Sentinel滑动窗口介绍
- Sentinel流量控制
- Sentinel的职责链slot介绍
- Sentinel熔断降级
- Sentinel Dashboard和应用通信
- Sentinel 控制台
开篇
- 本篇文章主要介绍Dashboard和应用侧App的通信逻辑。
- 1、App侧通过心跳上报本端的ip和port到Dashboard的服务侧。
- 2、Dashboard的UI界面通过Dashboard后端请求App端获取数据。
启动入口
// Dashboard侧的启动流程
@SpringBootApplication
public class DashboardApplication {
public static void main(String[] args) {
triggerSentinelInit();
SpringApplication.run(DashboardApplication.class, args);
}
private static void triggerSentinelInit() {
new Thread(() -> InitExecutor.doInit()).start(); // @1
}
}
// App侧的启动流程
public class Env {
public static final Sph sph = new CtSph();
static {
InitExecutor.doInit();// @2
}
}
- @1 Dashboard通过InitExecutor#doInit来启动。
- @2 App侧通过InitExecutor#doInit来启动。
InitExecutor
public final class InitExecutor {
private static AtomicBoolean initialized = new AtomicBoolean(false);
public static void doInit() {
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
// 通过SPI加载com.alibaba.csp.sentinel.init.InitFunc的实现类
List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : initFuncs) {
insertSorted(initList, initFunc);
}
// 遍历并初始化所有的InitFunc
for (OrderWrapper w : initList) {
w.func.init();
}
} catch (Exception ex) {
} catch (Error error) {
}
}
}
// 返回的InitFunc对象的服务
com.alibaba.csp.sentinel.metric.extension.MetricCallbackInit
com.alibaba.csp.sentinel.transport.init.CommandCenterInitFunc
com.alibaba.csp.sentinel.transport.init.HeartbeatSenderInitFunc
com.alibaba.csp.sentinel.init.ParamFlowStatisticSlotCallbackInit
- doInit负责MetricCallbackInit、CommandCenterInitFunc、
HeartbeatSenderInitFunc、ParamFlowStatisticSlotCallbackInit的启动。
MetricCallbackInit
public class MetricCallbackInit implements InitFunc {
@Override
public void init() throws Exception {
StatisticSlotCallbackRegistry.addEntryCallback(MetricEntryCallback.class.getCanonicalName(),
new MetricEntryCallback());
StatisticSlotCallbackRegistry.addExitCallback(MetricExitCallback.class.getCanonicalName(),
new MetricExitCallback());
}
}
- MetricCallbackInit负责注册MetricEntryCallback和MetricExitCallback回调。
HeartbeatSenderInitFunc
@InitOrder(-1)
public class HeartbeatSenderInitFunc implements InitFunc {
private ScheduledExecutorService pool = null;
private void initSchedulerIfNeeded() {
if (pool == null) {
pool = new ScheduledThreadPoolExecutor(2,
new NamedThreadFactory("sentinel-heartbeat-send-task", true),
new DiscardOldestPolicy());
}
}
@Override
public void init() {
HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
initSchedulerIfNeeded();
long interval = retrieveInterval(sender);
setIntervalIfNotExists(interval);
scheduleHeartbeatTask(sender, interval); // @1
}
private void scheduleHeartbeatTask(final HeartbeatSender sender, long interval) {
pool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
sender.sendHeartbeat();
} catch (Throwable e) {
}
}
}, 5000, interval, TimeUnit.MILLISECONDS);
}
}
- @1的HeartbeatSenderInitFunc通过scheduleHeartbeatTask发送心跳数据。
public class SimpleHttpHeartbeatSender implements HeartbeatSender {
private static final int OK_STATUS = 200;
private static final long DEFAULT_INTERVAL = 1000 * 10;
private final HeartbeatMessage heartBeat = new HeartbeatMessage();
private final SimpleHttpClient httpClient = new SimpleHttpClient();
private final List<Endpoint> addressList;
private int currentAddressIdx = 0;
public SimpleHttpHeartbeatSender() {
List<Endpoint> newAddrs = TransportConfig.getConsoleServerList();
this.addressList = newAddrs;
}
@Override
public boolean sendHeartbeat() throws Exception {
if (TransportConfig.getRuntimePort() <= 0) {
return false;
}
Endpoint addrInfo = getAvailableAddress(); // @1
if (addrInfo == null) {
// 地址为空的情况下直接返回不进行心跳
return false;
}
SimpleHttpRequest request = new SimpleHttpRequest(addrInfo, TransportConfig.getHeartbeatApiPath());
request.setParams(heartBeat.generateCurrentMessage());
try {
SimpleHttpResponse response = httpClient.post(request);
if (response.getStatusCode() == OK_STATUS) {
return true;
} else if (clientErrorCode(response.getStatusCode()) || serverErrorCode(response.getStatusCode())) {
}
} catch (Exception e) {
}
return false;
}
}
- @1在SimpleHttpHeartbeatSender的sendHeartbeat在Endpoint为空情况直接返回。
- Dashboard侧因为Endpoint为空的场景下直接返回。
- App侧指定Dashboard的Endpoint后通过发送心跳上报ip:port对信息。
ParamFlowStatisticSlotCallbackInit
public class ParamFlowStatisticSlotCallbackInit implements InitFunc {
@Override
public void init() {
StatisticSlotCallbackRegistry.addEntryCallback(ParamFlowStatisticEntryCallback.class.getName(),
new ParamFlowStatisticEntryCallback());
StatisticSlotCallbackRegistry.addExitCallback(ParamFlowStatisticExitCallback.class.getName(),
new ParamFlowStatisticExitCallback());
}
}
- ParamFlowStatisticSlotCallbackInit负责注册ParamFlowStatisticEntryCallback 和 ParamFlowStatisticExitCallback。
CommandCenterInitFunc
@InitOrder(-1)
public class CommandCenterInitFunc implements InitFunc {
@Override
public void init() throws Exception {
// @1返回的是SimpleHttpCommandCenter
CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();
// @2执行SimpleHttpCommandCenter的beforeStart和start
commandCenter.beforeStart();
commandCenter.start();
}
public final class CommandCenterProvider {
private static CommandCenter commandCenter = null;
static {
resolveInstance();
}
private static void resolveInstance() {
// @1 获取高优先级的高的CommandCenter对象SimpleHttpCommandCenter
CommandCenter resolveCommandCenter = SpiLoader.of(CommandCenter.class).loadHighestPriorityInstance();
commandCenter = resolveCommandCenter;
}
public static CommandCenter getCommandCenter() {
return commandCenter;
}
}
- @1 CommandCenterProvider的getCommandCenter返回SimpleHttpCommandCenter。
- @2 通过SimpleHttpCommandCenter的beforeStart和start启动CommandCenter。
SimpleHttpCommandCenter
public class SimpleHttpCommandCenter implements CommandCenter {
private static final int PORT_UNINITIALIZED = -1;
private static final int DEFAULT_SERVER_SO_TIMEOUT = 3000;
private static final int DEFAULT_PORT = 8719;
@SuppressWarnings("rawtypes")
private static final Map<String, CommandHandler> handlerMap = new ConcurrentHashMap<String, CommandHandler>();
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private ExecutorService executor = Executors.newSingleThreadExecutor(
new NamedThreadFactory("sentinel-command-center-executor"));
private ExecutorService bizExecutor;
private ServerSocket socketReference;
@Override
@SuppressWarnings("rawtypes")
public void beforeStart() throws Exception {
// 查找并注册所有CommandHandler对象
Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
registerCommands(handlers);
}
@Override
public void start() throws Exception {
int nThreads = Runtime.getRuntime().availableProcessors();
this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),
new NamedThreadFactory("sentinel-command-center-service-executor"),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
CommandCenterLog.info("EventTask rejected");
throw new RejectedExecutionException();
}
});
// 启动任务
Runnable serverInitTask = new Runnable() {
int port;
{
try {
port = Integer.parseInt(TransportConfig.getPort());
} catch (Exception e) {
port = DEFAULT_PORT;
}
}
@Override
public void run() {
boolean success = false;
ServerSocket serverSocket = getServerSocketFromBasePort(port);
if (serverSocket != null) {
socketReference = serverSocket;
// 执行启动任务ServerThread
executor.submit(new ServerThread(serverSocket));
success = true;
port = serverSocket.getLocalPort();
} else {
}
if (!success) {
port = PORT_UNINITIALIZED;
}
TransportConfig.setRuntimePort(port);
executor.shutdown();
}
};
// 启动单独线程启动初始化任务
new Thread(serverInitTask).start();
}
class ServerThread extends Thread {
private ServerSocket serverSocket;
ServerThread(ServerSocket s) {
this.serverSocket = s;
}
@Override
public void run() {
while (true) {
Socket socket = null;
try {
socket = this.serverSocket.accept();
setSocketSoTimeout(socket);
HttpEventTask eventTask = new HttpEventTask(socket);
bizExecutor.submit(eventTask);
} catch (Exception e) {
if (socket != null) {
try {
socket.close();
} catch (Exception e1) {
}
}
try {
Thread.sleep(10);
} catch (InterruptedException e1) {
break;
}
}
}
}
}
private static ServerSocket getServerSocketFromBasePort(int basePort) {
int tryCount = 0;
while (true) {
try {
ServerSocket server = new ServerSocket(basePort + tryCount / 3, 100);
server.setReuseAddress(true);
return server;
} catch (IOException e) {
tryCount++;
try {
TimeUnit.MILLISECONDS.sleep(30);
} catch (InterruptedException e1) {
break;
}
}
}
return null;
}
}
- SimpleHttpCommandCenter的beforeStart负责注册CommandHandler。
- SimpleHttpCommandCenter的start负责启动ServerThread来处理请求连接。
public class HttpEventTask implements Runnable {
public static final String SERVER_ERROR_MESSAGE = "Command server error";
public static final String INVALID_COMMAND_MESSAGE = "Invalid command";
private final Socket socket;
private boolean writtenHead = false;
public HttpEventTask(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
if (socket == null) {
return;
}
PrintWriter printWriter = null;
InputStream inputStream = null;
try {
long start = System.currentTimeMillis();
inputStream = new BufferedInputStream(socket.getInputStream());
OutputStream outputStream = socket.getOutputStream();
printWriter = new PrintWriter(
new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));
String firstLine = readLine(inputStream);
CommandRequest request = processQueryString(firstLine);
if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) {
// Deal with post method
processPostRequest(inputStream, request);
}
// Validate the target command.
String commandName = HttpCommandUtils.getTarget(request);
if (StringUtil.isBlank(commandName)) {
writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE);
return;
}
// Find the matching command handler.
CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
if (commandHandler != null) {
CommandResponse<?> response = commandHandler.handle(request);
handleResponse(response, printWriter);
} else {
// No matching command handler.
writeResponse(printWriter, StatusCode.BAD_REQUEST, "Unknown command `" + commandName + '`');
}
long cost = System.currentTimeMillis() - start;
} catch (RequestException e) {
// 省略代码
} catch (Throwable e) {
// 省略代码
} finally {
closeResource(inputStream);
closeResource(printWriter);
closeResource(socket);
}
}
}
- HttpEventTask负责处理处理请求的解析并查找对应的CommandHandler完成逻辑处理。
CommandHandler
com.alibaba.csp.sentinel.command.handler.GetParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.BasicInfoCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchActiveRuleCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterNodeByIdCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterNodeHumanCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchJsonTreeCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchOriginCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchSimpleClusterNodeCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchSystemStatusCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchTreeCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.OnOffGetCommandHandler
com.alibaba.csp.sentinel.command.handler.OnOffSetCommandHandler
com.alibaba.csp.sentinel.command.handler.SendMetricCommandHandler
com.alibaba.csp.sentinel.command.handler.VersionCommandHandler
com.alibaba.csp.sentinel.command.handler.cluster.FetchClusterModeCommandHandler
com.alibaba.csp.sentinel.command.handler.cluster.ModifyClusterModeCommandHandler
com.alibaba.csp.sentinel.command.handler.ApiCommandHandler
com.alibaba.csp.sentinel.demo.commandhandler.EchoCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterServerFlowConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterServerConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterServerTransportConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyServerNamespaceSetHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterServerInfoCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterMetricCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyClusterClientConfigHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterClientConfigHandler
com.alibaba.csp.sentinel.adapter.gateway.common.command.UpdateGatewayApiDefinitionGroupCommandHandler
com.alibaba.csp.sentinel.adapter.gateway.common.command.UpdateGatewayRuleCommandHandler
com.alibaba.csp.sentinel.adapter.gateway.common.command.GetGatewayApiDefinitionGroupCommandHandler
com.alibaba.csp.sentinel.adapter.gateway.common.command.GetGatewayRuleCommandHandler
- 目前Dashboard内置支持的CommandHandler列表。
Dashboard的Controller
├── AppController.java
├── AuthController.java
├── AuthorityRuleController.java
├── DegradeController.java
├── DemoController.java
├── FlowControllerV1.java
├── MachineRegistryController.java
├── MetricController.java
├── ParamFlowRuleController.java
├── ResourceController.java
├── SystemController.java
├── VersionController.java
├── cluster
│ ├── ClusterAssignController.java
│ └── ClusterConfigController.java
├── gateway
│ ├── GatewayApiController.java
│ └── GatewayFlowRuleController.java
└── v2
└── FlowControllerV2.java
- Dashboard通过不同的Controller来处理不同的逻辑。
- Dashboard的MachineRegistryController负责处理App侧的心跳请求。
- Dashboard的其他Controller负责处理web请求并转发到App侧进行处理。
- Dashboard服务端通过SentinelApiClient和App侧进行通信获取各类数据。