转载-mycat启动分析

2018-07-05  本文已影响12人  SteveGuRen
  1. 程序的入口是io.mycat.mycat2.MycatCore.
    在main 方法中 首选取得ProxyRuntime的实例,该类是一个单例模式.
runtime.setConfig(new MycatConfig());

该方法为runtime设置了一个MycatConfig ,MycatConfig 是负责读取配置文件的.

ConfigLoader.INSTANCE.loadCore();

调用ConfigLoader 进行配置文件的加载.

3.1. 加载mycat.yml,heartbeat.yml,cluster.yml,balancer.yml,user.yml 的配置文件.代码如下:

public void loadCore() throws IOException {
    loadConfig(ConfigEnum.PROXY, GlobalBean.INIT_VERSION);
    loadConfig(ConfigEnum.HEARTBEAT, GlobalBean.INIT_VERSION);
    loadConfig(ConfigEnum.CLUSTER, GlobalBean.INIT_VERSION);
    loadConfig(ConfigEnum.BALANCER, GlobalBean.INIT_VERSION);
    loadConfig(ConfigEnum.USER, GlobalBean.INIT_VERSION);
}

3.2 进行配置文件的加载,调用了YamlUtil进行加载,并将配置信息赋值给MycatConfig,关键代码如下 :

  conf.putConfig(configEnum, (Configurable) YamlUtil.load(fileName, configEnum.getClazz()), version);

3.3 在io.mycat.util.YamlUtil.load(String, Class<T>) 中,处理也很简单,通过Yaml进行文件的加载,然后通过反射机制将属性进行赋值.代码如下:

/**
 * 从指定的文件中加载配置
 * @param fileName 需要加载的文件名
 * @param clazz 加载后需要转换成的类对象
 * @return
 * @throws FileNotFoundException
 */
public static <T> T load(String fileName, Class<T> clazz) throws FileNotFoundException {
    InputStreamReader fis = null;
    try {
        URL url = YamlUtil.class.getClassLoader().getResource(fileName);
        if (url != null) {
            Yaml yaml = new Yaml();
            fis = new InputStreamReader(new FileInputStream(url.getFile()), StandardCharsets.UTF_8);
            T obj = yaml.loadAs(fis, clazz);
            return obj;
        }
        return null;
    } finally {
        if (fis != null) {
            try {
                fis.close();
            } catch (IOException ignored) {
            }
        }
    }
}
  1. 进行通过命令行传入参数的解析.如:可以传入
    • -mycat.proxy.port 8067
    • -mycat.cluster.enable true
    • -mycat.cluster.port 9067
    • -mycat.cluster.myNodeId leader-2

参数等
支持的参数可在 io.mycat.mycat2.beans.ArgsBean 中进行查看.

获得参数后通过遍历进行赋值操作即可,代码如下:

private static void solveArgs(String[] args) {
int lenght = args.length;

MycatConfig conf = ProxyRuntime.INSTANCE.getConfig();
ProxyConfig proxyConfig = conf.getConfig(ConfigEnum.PROXY);
ClusterConfig clusterConfig = conf.getConfig(ConfigEnum.CLUSTER);
BalancerConfig balancerConfig= conf.getConfig(ConfigEnum.BALANCER);

for (int i = 0; i < lenght; i++) {
    switch(args[i]) {
        case ArgsBean.PROXY_PORT:
            proxyConfig.getProxy().setPort(Integer.parseInt(args[++i]));
            break;
        case ArgsBean.CLUSTER_ENABLE:
            clusterConfig.getCluster().setEnable(Boolean.parseBoolean(args[++i]));
            break;
        case ArgsBean.CLUSTER_PORT:
            clusterConfig.getCluster().setPort(Integer.parseInt(args[++i]));
            break;
        case ArgsBean.CLUSTER_MY_NODE_ID:
            clusterConfig.getCluster().setMyNodeId(args[++i]);
            break;
        case ArgsBean.BALANCER_ENABLE:
            balancerConfig.getBalancer().setEnable(Boolean.parseBoolean(args[++i]));
            break;
        case ArgsBean.BALANCER_PORT:
            balancerConfig.getBalancer().setPort(Integer.parseInt(args[++i]));
            break;
        case ArgsBean.BALANCER_STRATEGY:
            BalancerBean.BalancerStrategyEnum strategy = BalancerBean.BalancerStrategyEnum.getEnum(args[++i]);
            if (strategy == null) {
                throw new IllegalArgumentException("no such balancer strategy");
            }
            balancerConfig.getBalancer().setStrategy(strategy);
            break;
        default:
            break;
    }
}
}
  1. 设置NioReactorThreads,线程数目按照cpu的数目而定.
int cpus = Runtime.getRuntime().availableProcessors();
runtime.setNioReactorThreads(cpus);
runtime.setReactorThreads(new MycatReactorThread[cpus]);

6.设置MycatSessionManager

  1. 然后调用io.mycat.proxy.ProxyRuntime 的init方法,进行资源的初始化.代码如下:
public void init() {
//心跳调度独立出来,避免被其他任务影响
heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();
HeartbeatConfig heartbeatConfig = config.getConfig(ConfigEnum.HEARTBEAT);
timerExecutor = ExecutorUtil.create("Timer", heartbeatConfig.getHeartbeat().getTimerExecutor());
businessExecutor = ExecutorUtil.create("BusinessExecutor",Runtime.getRuntime().availableProcessors());
listeningExecutorService = MoreExecutors.listeningDecorator(businessExecutor);
MatchMethodGenerator.initShrinkCharTbl();
}

7.1. 建立心跳线程, 原因是避免被其他任务影响.

7.2 建立timerExecutor,线程数目默认为2.

7.3 建立businessExecutor,线程数目为cpu数目.

7.4 建立listeningExecutorService, 该ExecutorService将会进行任务的提交给businessExecutor

7.5 调用io.mycat.mycat2.sqlparser.MatchMethodGenerator#initShrinkCharTbl方法.该方法只是对0-9a-zA-Z的字符进行映射.代码如下:

static final byte[] shrinkCharTbl = new byte[96];//为了压缩hash字符映射空间,再次进行转义
public static void initShrinkCharTbl () {
    shrinkCharTbl[0] = 1;//从 $ 开始计算
    IntStream.rangeClosed('0', '9').forEach(c -> shrinkCharTbl[c-'$'] = (byte)(c-'0'+2));
    IntStream.rangeClosed('A', 'Z').forEach(c -> shrinkCharTbl[c-'$'] = (byte)(c-'A'+12));
    IntStream.rangeClosed('a', 'z').forEach(c -> shrinkCharTbl[c-'$'] = (byte)(c-'a'+12));
    shrinkCharTbl['_'-'$'] = (byte)38;
}
  1. 调用ProxyStarter的start,首先 是创建了NIOAcceptor,负责对请求进行处理.然后根据ClusterConfig的配置进行相应的处理.
ClusterConfig clusterConfig = conf.getConfig(ConfigEnum.CLUSTER);
ClusterBean clusterBean = clusterConfig.getCluster();
if (clusterBean.isEnable()) {
    // 启动集群
    startCluster(runtime, clusterBean, acceptor);
} else {
    // 未配置集群,直接启动
    startProxy(true);
}

9.接下来分析未配置集群的启动方式.

9.1 首先通过ProxyRuntime获取到ProxyConfig(该对象是mycat.yml的封装),mycat.yml的配置文件如下:

proxy:
  ip: 0.0.0.0
  port: 8066

因此可以通过该类拿到启动端口和ip.因此传入NIOAcceptor进行监听.

9.2 在io.mycat.proxy.NIOAcceptor#startServerChannel中,做了如下处理:

9.2.1 首先检查ServerSocketChannel是否已经启动,如果已经启动,不进行后续处理.

9.2.2 根据传入的ServerType做不同的处理,当前我们传入的值为MYCAT. 因此不进行处理.

if (serverType == ServerType.CLUSTER) {
    adminSessionMan = new DefaultAdminSessionManager();
    ProxyRuntime.INSTANCE.setAdminSessionManager(adminSessionMan);
    logger.info("opend cluster conmunite port on {}:{}", ip, port);
} else if (serverType == ServerType.LOAD_BALANCER){
    logger.info("opend load balance conmunite port on {}:{}", ip, port);
    ProxyRuntime.INSTANCE.setProxySessionSessionManager(new ProxySessionManager());
    ProxyRuntime.INSTANCE.setLbSessionSessionManager(new LBSessionManager());
}

9.2.3 调用io.mycat.proxy.NIOAcceptor#getServerSocketChannel,进行启动.代码如下:

private void openServerChannel(Selector selector, String bindIp, int bindPort, ServerType serverType)
    throws IOException {
final ServerSocketChannel serverChannel = ServerSocketChannel.open();
final InetSocketAddress isa = new InetSocketAddress(bindIp, bindPort);
serverChannel.bind(isa);
serverChannel.configureBlocking(false);
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverChannel.register(selector, SelectionKey.OP_ACCEPT, serverType);
if (serverType == ServerType.CLUSTER) {
    logger.info("open cluster server port on {}:{}", bindIp, bindPort);
    clusterServerSocketChannel = serverChannel;
} else if (serverType == ServerType.LOAD_BALANCER) {
    logger.info("open load balance server port on {}:{}", bindIp, bindPort);
    loadBalanceServerSocketChannel = serverChannel;
} else {
    logger.info("open proxy server port on {}:{}", bindIp, bindPort);
    proxyServerSocketChannel = serverChannel;
}
}

9.4 调用io.mycat.mycat2.ProxyStarter#startReactor,进行MycatReactorThread的启动.该线程负责对请求进行处理.

9.5 加载配置文件信息,该方法在ROOT_PATH目录下创建了prepare和archive两个文件夹,并加载了replica-index.yml,datasource.yml,schema.yml,最后调用了AnnotationProcessor#getInstance 进行初始化.

9.5.1 创建DynamicAnnotationManager.

9.5.2 对AnnotationProcessor.class.getClassLoader().getResource("") 的路径进行监听.当文件目录有变化时会回调io.mycat.mycat2.sqlannotations.AnnotationProcessor#listen方法.代码如下:

public static void listen() {
try {
while (true) {
    WatchKey key = watcher.take();//todo 线程复用,用 poll比较好?
    boolean flag = false;
    for (WatchEvent<?> event: key.pollEvents()) {
        String str = event.context().toString();
        if ("actions.yml".equals(str)|| "annotations.yml".equals(str)) {
            flag=true;
            break;
        }
    }
    if (flag){
        System.out.println("动态注解更新次数" + count.incrementAndGet());
        init();
    }
    boolean valid = key.reset();
    if (!valid) {
        break;
    }

}
} catch (Exception e) {
e.printStackTrace();
}
}

==可以看到,当actions.yml或者annotations.yml变化时,就会重新调用init方法,从而对DynamicAnnotationManager进行修改.==

9.6 分别对datasource.yml 和 schema.yml 进行加载.

9.6.1 对MySQLRepBean进行初始化.该bean是对datasource.yml的封装.datasource.yml如下所示:

replicas:
  - name: test                      # 复制组 名称   必须唯一
    repType: MASTER_SLAVE           # 复制类型
    switchType: SWITCH              # 切换类型
    balanceType: BALANCE_ALL_READ   # 读写分离类型
    mysqls:            
      - hostName: mysql-01              # mysql 主机名
        ip: 127.0.0.1               # ip
        port: 3306                  # port
        user: root                  # 用户名
        password: root            # 密码
        minCon: 1                   # 最小连接
        maxCon: 10                  # 最大连接
        maxRetryCount: 3            # 连接重试次数

conf.getMysqlRepMap().forEach((repName, repBean) -> {
    repBean.initMaster();
    repBean.getMetaBeans().forEach(metaBean -> metaBean.prepareHeartBeat(repBean, repBean.getDataSourceInitStatus()));
});

9.6.2 调用io.mycat.mycat2.beans.MySQLRepBean#initMaster, 做了如下处理:

首先加载replica-index.yml,获取replica name 对应的index,并对MySQLRepBean中的metaBeans进行赋值,完成对mysqls 的封装.

之后调用MySQLMetaBean的prepareHeartBeat方法.
完成

9.7 ==调用io.mycat.proxy.ProxyRuntime#startHeartBeatScheduler,启动heartbeatScheduler线程.该线程会每10000 ms 调用io.mycat.proxy.ProxyRuntime#replicaHeartbeat.== 心跳的配置在heartbeat.yml中,默认配置如下:

heartbeat:
  timerExecutor: 2
  replicaHeartbeatPeriod: 10000
  replicaIdleCheckPeriod: 2000
  idleTimeout: 2000
  processorCheckPeriod: 2000
  minSwitchtimeInterval: 120000

9.7.1 在io.mycat.proxy.ProxyRuntime#replicaHeartbeat,方法中代码如下:

private Runnable replicaHeartbeat() {
return ()->{
ProxyReactorThread<?> reactor  = getReactorThreads()[ThreadLocalRandom.current().nextInt(getReactorThreads().length)];
reactor.addNIOJob(()-> config.getMysqlRepMap().values().stream().forEach(f -> f.doHeartbeat()));
};
}

最终会调用io.mycat.mycat2.beans.heartbeat.MySQLHeartbeat#heartbeat.

9.8 对cluster的配置进行处理.代码如下:

BalancerConfig balancerConfig = conf.getConfig(ConfigEnum.BALANCER);
BalancerBean balancerBean = balancerConfig.getBalancer();
// 集群模式下才开启负载均衡服务
if (clusterBean.isEnable() && balancerBean.isEnable()) {
    runtime.getAcceptor().startServerChannel(balancerBean.getIp(), balancerBean.getPort(), ServerType.LOAD_BALANCER);
}
上一篇下一篇

猜你喜欢

热点阅读