MyCAT源码分析(二)服务启动流程

2019-08-10  本文已影响0人  李亚林1990

本篇将根据启动类MycatStartup.java来概览MyCAT的启动流程。
为了简明清晰,所有源码只列出关键代码加以说明,只做NIO网络框架的相关分析。

Mycat的主要启动流程在单例MycatServer.java中,我们先看下构造方法。

private MycatServer() {
        //读取文件配置
        this.config = new MycatConfig();

        //缓存服务初始化
        cacheService = new CacheService();
        
        //路由计算初始化
        routerService = new RouteService(cacheService);

                //SQL解析器
        sqlInterceptor = (SQLInterceptor) Class.forName(
                    config.getSystem().getSqlInterceptor()).newInstance();
}

主要逻辑说明:
1、读取配置文件。
即schema.xml、server.xml、rule.xml。mycat支持两种配置,zk或者本地文件,方式配置项在myid.properties文件中的loadZk=false or true。基于zk的配置方式会从zk将约定路径上的配置内容读取到本地文件,在此行代码解析到MycatConfig.java类中。

2、缓存和路由计算初始化。
缓存的配置信息在cacheservice.properties,默认实现为factory.encache=io.mycat.cache.impl.EnchachePooFactory。支持可配各个场景的缓存实现,缓存空间的大小,以及缓存的超时时间。例:
pool.SQLRouteCache=encache,10000,1800
layedpool.TableID2DataNodeCache=encache,10000,18000
配置了RouteService的本地缓存池。
备注:
SQLRouteCache的缓存key为 schema+SQL语句,value为 RouteResultset路由信息
TableID2DataNodeCache的缓存key为 id 的值,value为 节点名

接下来我们分析MycatServer.startup()方法,该方法中完成了Mycat网络通信框架中前端、后端通信的关键组件的初始化。

public void startup() throws IOException {
        //创建对SocketChannel进行封装的工厂类
    ServerConnectionFactory sf = new ServerConnectionFactory();

        //创建缓存块pool,支持可配,默认使用直接内存
    bufferPool = new DirectByteBufferPool(bufferPoolPageSize,bufferPoolChunkSize,
                    bufferPoolPageNumber,system.getFrontSocketSoRcvbuf());

        //创建业务线程池和异步处理器
        processors = new NIOProcessor[processorCount];
        //业务线程池的大小可以适当小一些,Mycat主要处理流程都是在reactor线程组中完成的
    businessExecutor = ExecutorUtil.create("BusinessExecutor",
                threadPoolSize);
    for (int i = 0; i < processors.length; i++) {
            processors[i] = new NIOProcessor("Processor" + i, bufferPool,
                    businessExecutor);
    }

        //创建NIOReactorPool, 默认大小为机器cpu核心线程数
    NIOReactorPool reactorPool = new NIOReactorPool(
                    DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + "NIOREACTOR",
                    processors.length);

        //创建NIOConnector
    connector = new NIOConnector(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + "NIOConnector", reactorPool);
            ((NIOConnector) connector).start();

        //创建并启动NIOAcceptor
    server = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME
                    + "Server", system.getBindIp(), system.getServerPort(), sf, reactorPool);
        server.start();
        
        //初始化数据库连接,调度心跳监听任务
    Map<String, PhysicalDBPool> dataHosts = config.getDataHosts();
        for (PhysicalDBPool node : dataHosts.values()) {
            String index = dnIndexProperties.getProperty(node.getHostName(),"0");
            if (!"0".equals(index)) {
                LOGGER.info("init datahost: " + node.getHostName() + "  to use datasource index:" + index);
            }
            node.init(Integer.parseInt(index));
            node.startHeartbeat();
        } 
        scheduler.scheduleAtFixedRate(dataNodeHeartbeat(), 0L, system.getDataNodeHeartbeatPeriod(),TimeUnit.MILLISECONDS);       
}

主要逻辑说明:
1、ServerConnectionFactory
封装SocketChannel为ServerConnection, 设置数据包handler处理器,创建当前连接的NonBlockingSession

2、DirectByteBufferPool
预创建缓存块pool

3、NIOReactorPool-线程组
NIOReactor线程组,用来监听并处理前端和后端的SelectionKey.OP_READ读事件,可以看到NIOConnector和NIOAcceptor都持有了reactorPool,创建连接后都交给reactorPool去处理。

4、NIOConnector-单线程
创建数据库连接,监听并处理SelectionKey.OP_CONNECT事件

5、NIOAcceptor-单线程
接收客户端(也就是业务服务)连接请求,监听并处理SelectionKey.OP_ACCEPT事件

本篇分析到此结束。

转载请备注原文链接。

上一篇下一篇

猜你喜欢

热点阅读