美团Cat源码浅析(一)Java客户端初始化流程

2018-11-23  本文已影响64人  李亚林1990

最近对公司的监控系统Cat从源码角度去做了一下了解。为方便后续学习Cat的同学做一下借鉴,现尝试从Java客户端的初始化流程、Java客户端消息发送、消息协议、服务端消息分发等几个方面来浅析Cat源码。有不对的地方,望大家多多指正!
备注:
Cat版本:v3.0.0
github:https://github.com/dianping/cat
本系列博客图片主要来自官方说明文档。ps: 人家图画的太清晰了

Java客户端的初始化流程
一、消息的组织
Cat使用消息树(MessageTree)组织日志,下面为消息树的类定义


image.png

我们每次操作的实体都是消息树,其中有个domain字段,这是cat中一个非常重要的概念,一个domain可以对应成一个project( 比如bluewhale-app),每个消息树拥有一个唯一的MessageId, 不同的消息树(比如微服务中A服务调用B服务,A,B都会生成消息树) 通过 parenMessageId、rootMessageId 串联起来,消息树下的所有实体都是Message,一共有5种类型的Message, 分别是Transaction, Event, Trace, Metric和Heartbeat。
二、Java客户端的初始化
客户端操作对象Cat封装了所有的接口。下面通过上报一个Transaction类型的消息来了解客户端的初始化流程。

class Cat{
    public static Transaction newTransaction(String type, String name) {
        return Cat.getProducer().newTransaction(type, name);
    }

    public static MessageProducer getProducer() {
        checkAndInitialize();
        return s_instance.m_producer;
    }

    private static void checkAndInitialize() {
        initialize(new File(getCatHome(), "client.xml"));
    }

    public static void initialize(File configFile) {
        //IOC容器
        PlexusContainer container = ContainerLoader.getDefaultContainer();
        ModuleContext ctx = new DefaultModuleContext(container);
        //主要逻辑在CatClientModule.class
        Module module = ctx.lookup(Module.class, CatClientModule.ID);

        if (!module.isInitialized()) {
            ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);
            ctx.setAttribute("cat-client-config-file", configFile);
            initializer.execute(ctx, module);
        }
    }
}

创建transaction首先会通过getProducer函数获取消息生产者MessageProducer对象,在返回MessageProducer对象之前,函数会对客户端进行初始化。初始化plexus容器,调用CatClientModule的excute方法。

class CatClientModule {
    @Override
    protected void execute(final ModuleContext ctx) throws Exception {
    ...
        // bring up TransportManager
        ctx.lookup(TransportManager.class);

        ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);

        if (clientConfigManager.isCatEnabled()) {
            // start status update task
            StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);
            Threads.forGroup("cat").start(statusUpdateTask);
        }
    }
}

上面代码主要需要关注两个点:
1、DefaultTransportManager的实例化,最终启动Netty Tcp客户端。
2、启动一个StatusUpdateTask线程每隔一段时间发送一个HeartBeatMessage,其中包括了客户端能拿到的各种信息,包括CPU,Memory,Disk等等,通过访问者模式采集这些信息到StatusInfo。之后将status封装到HeartBeatMessage中,按照一般对于message的处理流程,flush到消息传输层中。
接下来我们来看Netty Tcp客户端的启动。

class DefaultTransportManager {
    @Inject
    private TcpSocketSender m_tcpSocketSender;
    public void initialize() {
        ...
        List<Server> servers = m_configManager.getServers();
        List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
        for (Server server : servers) {
            addresses.add(new InetSocketAddress(server.getIp(), server.getPort()));
        }
        m_tcpSocketSender.initialize(addresses);
        ...
    }
}

上面代码根据server配置信息去初始化TcpSocketSender。

class TcpSocketSender {
    private MessageQueue m_queue = new DefaultMessageQueue(SIZE);
    public void initialize(List<InetSocketAddress> addresses) {
        //初始化netty客户端,建立TCP连接。
        m_channelManager = new ChannelManager(m_logger, addresses, m_configManager, m_factory);
        //启动一个线程从消息队列获取消息并上报
        Threads.forGroup("cat").start(this);
        //监听server配置信息变化,重连
        Threads.forGroup("cat").start(m_channelManager);
    }
    //从消息队列获取消息并上报
    public void run() {
        m_active = true;
        while (true) {
            MessageTree tree = m_queue.poll();
            if (tree != null) {
                ChannelFuture channel = m_channelManager.channel();
                if (channel != null) {
                    sendInternal(channel, tree);
                } else {
                    offer(tree);
                }
            } else {
                break;
            }
        }
    }
}

上面代码主要干了两件事,实例化ChannelManager,初始化netty客户端,建立TCP长连接。启动一个线程执行run()函数从消息队列获取消息并上报。

class ChannelManager {
    public ChannelManager(List<InetSocketAddress>serverAddresses) {
        EventLoopGroup group = new NioEventLoopGroup(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                return t;
            }
        });
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group).channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
            }
        });
        m_bootstrap = bootstrap;
        ...
        initChannel(configedAddresses, routerConfig);
        ...
    }
    private ChannelFuture createChannel(InetSocketAddress address) {
          ...
        ChannelFuture future = m_bootstrap.connect(address);
        ...
    }
}

最后,来一张图。


image.png
上一篇下一篇

猜你喜欢

热点阅读