Sentinel

Sentinel Dashboard和应用通信

2021-06-26  本文已影响0人  晴天哥_王志

系列

开篇

启动入口

// Dashboard侧的启动流程
@SpringBootApplication
public class DashboardApplication {

    public static void main(String[] args) {
        triggerSentinelInit();
        SpringApplication.run(DashboardApplication.class, args);
    }

    private static void triggerSentinelInit() {
        new Thread(() -> InitExecutor.doInit()).start(); // @1
    }
}

// App侧的启动流程
public class Env {
    public static final Sph sph = new CtSph();
    static {
        InitExecutor.doInit();// @2
    }
}

InitExecutor

public final class InitExecutor {

    private static AtomicBoolean initialized = new AtomicBoolean(false);

    public static void doInit() {
        if (!initialized.compareAndSet(false, true)) {
            return;
        }

        try {
            // 通过SPI加载com.alibaba.csp.sentinel.init.InitFunc的实现类
            List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
            List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
            for (InitFunc initFunc : initFuncs) {
                insertSorted(initList, initFunc);
            }
            // 遍历并初始化所有的InitFunc
            for (OrderWrapper w : initList) {
                w.func.init();
            }
        } catch (Exception ex) {
        } catch (Error error) {
        }
    }
}

// 返回的InitFunc对象的服务 
com.alibaba.csp.sentinel.metric.extension.MetricCallbackInit
com.alibaba.csp.sentinel.transport.init.CommandCenterInitFunc
com.alibaba.csp.sentinel.transport.init.HeartbeatSenderInitFunc
com.alibaba.csp.sentinel.init.ParamFlowStatisticSlotCallbackInit

MetricCallbackInit

public class MetricCallbackInit implements InitFunc {
    @Override
    public void init() throws Exception {
        StatisticSlotCallbackRegistry.addEntryCallback(MetricEntryCallback.class.getCanonicalName(),
            new MetricEntryCallback());
        StatisticSlotCallbackRegistry.addExitCallback(MetricExitCallback.class.getCanonicalName(),
            new MetricExitCallback());
    }
}

HeartbeatSenderInitFunc

@InitOrder(-1)
public class HeartbeatSenderInitFunc implements InitFunc {

    private ScheduledExecutorService pool = null;

    private void initSchedulerIfNeeded() {
        if (pool == null) {
            pool = new ScheduledThreadPoolExecutor(2,
                new NamedThreadFactory("sentinel-heartbeat-send-task", true),
                new DiscardOldestPolicy());
        }
    }

    @Override
    public void init() {
        HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
        initSchedulerIfNeeded();
        long interval = retrieveInterval(sender);
        setIntervalIfNotExists(interval);
        scheduleHeartbeatTask(sender, interval); // @1
    }

    private void scheduleHeartbeatTask(final HeartbeatSender sender, long interval) {
        pool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    sender.sendHeartbeat();
                } catch (Throwable e) {
                }
            }
        }, 5000, interval, TimeUnit.MILLISECONDS);
    }
}

public class SimpleHttpHeartbeatSender implements HeartbeatSender {

    private static final int OK_STATUS = 200;
    private static final long DEFAULT_INTERVAL = 1000 * 10;
    private final HeartbeatMessage heartBeat = new HeartbeatMessage();
    private final SimpleHttpClient httpClient = new SimpleHttpClient();
    private final List<Endpoint> addressList;
    private int currentAddressIdx = 0;

    public SimpleHttpHeartbeatSender() {
        List<Endpoint> newAddrs = TransportConfig.getConsoleServerList();
        this.addressList = newAddrs;
    }

    @Override
    public boolean sendHeartbeat() throws Exception {
        if (TransportConfig.getRuntimePort() <= 0) {
            return false;
        }
        Endpoint addrInfo = getAvailableAddress(); // @1
        if (addrInfo == null) {
            // 地址为空的情况下直接返回不进行心跳
            return false;
        }

        SimpleHttpRequest request = new SimpleHttpRequest(addrInfo, TransportConfig.getHeartbeatApiPath());
        request.setParams(heartBeat.generateCurrentMessage());
        try {
            SimpleHttpResponse response = httpClient.post(request);
            if (response.getStatusCode() == OK_STATUS) {
                return true;
            } else if (clientErrorCode(response.getStatusCode()) || serverErrorCode(response.getStatusCode())) {
            }
        } catch (Exception e) {
        }
        return false;
    }
}

ParamFlowStatisticSlotCallbackInit

public class ParamFlowStatisticSlotCallbackInit implements InitFunc {

    @Override
    public void init() {
        StatisticSlotCallbackRegistry.addEntryCallback(ParamFlowStatisticEntryCallback.class.getName(),
            new ParamFlowStatisticEntryCallback());
        StatisticSlotCallbackRegistry.addExitCallback(ParamFlowStatisticExitCallback.class.getName(),
            new ParamFlowStatisticExitCallback());
    }
}

CommandCenterInitFunc

@InitOrder(-1)
public class CommandCenterInitFunc implements InitFunc {

    @Override
    public void init() throws Exception {
        // @1返回的是SimpleHttpCommandCenter
        CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();
        // @2执行SimpleHttpCommandCenter的beforeStart和start
        commandCenter.beforeStart();
        commandCenter.start();
}


public final class CommandCenterProvider {

    private static CommandCenter commandCenter = null;
    static {
        resolveInstance();
    }

    private static void resolveInstance() {
        // @1 获取高优先级的高的CommandCenter对象SimpleHttpCommandCenter
        CommandCenter resolveCommandCenter = SpiLoader.of(CommandCenter.class).loadHighestPriorityInstance();
        commandCenter = resolveCommandCenter;
    }

    public static CommandCenter getCommandCenter() {
        return commandCenter;
    }
}

SimpleHttpCommandCenter

public class SimpleHttpCommandCenter implements CommandCenter {

    private static final int PORT_UNINITIALIZED = -1;
    private static final int DEFAULT_SERVER_SO_TIMEOUT = 3000;
    private static final int DEFAULT_PORT = 8719;

    @SuppressWarnings("rawtypes")
    private static final Map<String, CommandHandler> handlerMap = new ConcurrentHashMap<String, CommandHandler>();

    @SuppressWarnings("PMD.ThreadPoolCreationRule")
    private ExecutorService executor = Executors.newSingleThreadExecutor(
        new NamedThreadFactory("sentinel-command-center-executor"));
    private ExecutorService bizExecutor;
    private ServerSocket socketReference;

    @Override
    @SuppressWarnings("rawtypes")
    public void beforeStart() throws Exception {
        // 查找并注册所有CommandHandler对象
        Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
        registerCommands(handlers);
    }

    @Override
    public void start() throws Exception {
        int nThreads = Runtime.getRuntime().availableProcessors();
        this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(10),
            new NamedThreadFactory("sentinel-command-center-service-executor"),
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    CommandCenterLog.info("EventTask rejected");
                    throw new RejectedExecutionException();
                }
            });
        // 启动任务
        Runnable serverInitTask = new Runnable() {
            int port;

            {
                try {
                    port = Integer.parseInt(TransportConfig.getPort());
                } catch (Exception e) {
                    port = DEFAULT_PORT;
                }
            }

            @Override
            public void run() {
                boolean success = false;
                ServerSocket serverSocket = getServerSocketFromBasePort(port);

                if (serverSocket != null) {
                    socketReference = serverSocket;
                    // 执行启动任务ServerThread
                    executor.submit(new ServerThread(serverSocket));
                    success = true;
                    port = serverSocket.getLocalPort();
                } else {
                }

                if (!success) {
                    port = PORT_UNINITIALIZED;
                }

                TransportConfig.setRuntimePort(port);
                executor.shutdown();
            }

        };
        // 启动单独线程启动初始化任务
        new Thread(serverInitTask).start();
    }


    class ServerThread extends Thread {

        private ServerSocket serverSocket;

        ServerThread(ServerSocket s) {
            this.serverSocket = s;
        }

        @Override
        public void run() {
            while (true) {
                Socket socket = null;
                try {
                    socket = this.serverSocket.accept();
                    setSocketSoTimeout(socket);
                    HttpEventTask eventTask = new HttpEventTask(socket);
                    bizExecutor.submit(eventTask);
                } catch (Exception e) {
                    if (socket != null) {
                        try {
                            socket.close();
                        } catch (Exception e1) {
                        }
                    }
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e1) {
                        break;
                    }
                }
            }
        }
    }

    private static ServerSocket getServerSocketFromBasePort(int basePort) {
        int tryCount = 0;
        while (true) {
            try {
                ServerSocket server = new ServerSocket(basePort + tryCount / 3, 100);
                server.setReuseAddress(true);
                return server;
            } catch (IOException e) {
                tryCount++;
                try {
                    TimeUnit.MILLISECONDS.sleep(30);
                } catch (InterruptedException e1) {
                    break;
                }
            }
        }
        return null;
    }
}
public class HttpEventTask implements Runnable {

    public static final String SERVER_ERROR_MESSAGE = "Command server error";
    public static final String INVALID_COMMAND_MESSAGE = "Invalid command";
    private final Socket socket;
    private boolean writtenHead = false;

    public HttpEventTask(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        if (socket == null) {
            return;
        }

        PrintWriter printWriter = null;
        InputStream inputStream = null;
        try {
            long start = System.currentTimeMillis();
            inputStream = new BufferedInputStream(socket.getInputStream());
            OutputStream outputStream = socket.getOutputStream();
            printWriter = new PrintWriter(
                new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));
            String firstLine = readLine(inputStream);

            CommandRequest request = processQueryString(firstLine);
            if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) {
                // Deal with post method
                processPostRequest(inputStream, request);
            }

            // Validate the target command.
            String commandName = HttpCommandUtils.getTarget(request);
            if (StringUtil.isBlank(commandName)) {
                writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE);
                return;
            }

            // Find the matching command handler.
            CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
            if (commandHandler != null) {
                CommandResponse<?> response = commandHandler.handle(request);
                handleResponse(response, printWriter);
            } else {
                // No matching command handler.
                writeResponse(printWriter, StatusCode.BAD_REQUEST, "Unknown command `" + commandName + '`');
            }

            long cost = System.currentTimeMillis() - start;
        } catch (RequestException e) {
            // 省略代码
        } catch (Throwable e) {
            // 省略代码
        } finally {
            closeResource(inputStream);
            closeResource(printWriter);
            closeResource(socket);
        }
    }
}

CommandHandler

com.alibaba.csp.sentinel.command.handler.GetParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.BasicInfoCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchActiveRuleCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterNodeByIdCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterNodeHumanCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchJsonTreeCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchOriginCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchSimpleClusterNodeCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchSystemStatusCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchTreeCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.OnOffGetCommandHandler
com.alibaba.csp.sentinel.command.handler.OnOffSetCommandHandler
com.alibaba.csp.sentinel.command.handler.SendMetricCommandHandler
com.alibaba.csp.sentinel.command.handler.VersionCommandHandler
com.alibaba.csp.sentinel.command.handler.cluster.FetchClusterModeCommandHandler
com.alibaba.csp.sentinel.command.handler.cluster.ModifyClusterModeCommandHandler
com.alibaba.csp.sentinel.command.handler.ApiCommandHandler
com.alibaba.csp.sentinel.demo.commandhandler.EchoCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterServerFlowConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterServerConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterServerTransportConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyServerNamespaceSetHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterServerInfoCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterMetricCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyClusterClientConfigHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterClientConfigHandler
com.alibaba.csp.sentinel.adapter.gateway.common.command.UpdateGatewayApiDefinitionGroupCommandHandler
com.alibaba.csp.sentinel.adapter.gateway.common.command.UpdateGatewayRuleCommandHandler
com.alibaba.csp.sentinel.adapter.gateway.common.command.GetGatewayApiDefinitionGroupCommandHandler
com.alibaba.csp.sentinel.adapter.gateway.common.command.GetGatewayRuleCommandHandler

Dashboard的Controller

├── AppController.java
├── AuthController.java
├── AuthorityRuleController.java
├── DegradeController.java
├── DemoController.java
├── FlowControllerV1.java
├── MachineRegistryController.java
├── MetricController.java
├── ParamFlowRuleController.java
├── ResourceController.java
├── SystemController.java
├── VersionController.java
├── cluster
│   ├── ClusterAssignController.java
│   └── ClusterConfigController.java
├── gateway
│   ├── GatewayApiController.java
│   └── GatewayFlowRuleController.java
└── v2
    └── FlowControllerV2.java



上一篇 下一篇

猜你喜欢

热点阅读