互联网技术栈

Elasticsearch客户端源码分析

2017-06-03  本文已影响0人  周艺伟

背景

简单的介绍一下客户端启动的大体流程,在这里客户端与ES集群连接的方式是通过TCP的形式连接,本次分析基于Elasticsearch-2.3.4的版本。


初始化

想要通过TCP的形式连接到ES集群,需要指定几个参数:

初始化代码如下:

Settings settings = Settings.settingsBuilder().put("cluster.name", ES_CLUSTER_NAME).put("client.transport.sniff", true).build();
client = TransportClient.builder().settings(settings).build();
for (String esAddressPerNode : ES_ADDRESS.split("\\,")) {
   try {
      //添加初始化节点
      client = ((TransportClient) client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esAddressPerNode), ES_PORT));
   } catch (UnknownHostException e) {
      log.warn("client.addTransportAddress error", e);
   }
}
log.info("connectedNodes:" + ((TransportClient)client).connectedNodes());

可以看到ES通过builder模式来初始化客户端,我们简单的跟到build()方法进去,看看ES是怎么构造TransportClient的:

public TransportClient build() {
    Settings settings = InternalSettingsPreparer.prepareSettings(this.settings);
    settings = settingsBuilder()
            .put(NettyTransport.PING_SCHEDULE, "5s") // enable by default the transport schedule ping interval
            .put(settings)
            .put("network.server", false) //是否作为一个Netty服务端启动
            .put("node.client", true) //该节点是客户端节点,ES集群中另外还会有其他三种类型的节点:master、data、ingest(5.0版本才支持的,用于加工转换index前的原数据)
            .put(CLIENT_TYPE_SETTING, CLIENT_TYPE) //客户端模式=transport
            .build();
 
    PluginsService pluginsService = new PluginsService(settings, null, null, pluginClasses);
    this.settings = pluginsService.updatedSettings();
 
    Version version = Version.CURRENT;
 
    final ThreadPool threadPool = new ThreadPool(settings); //初始化客户端线程池,客户端对ES的各类请求都独立创建了一个线程池,做到一定程度上的隔离
    NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
 
    boolean success = false;
    try {
        //开始添加客户端初始化需要的模块
        ModulesBuilder modules = new ModulesBuilder();
        modules.add(new Version.Module(version));
        // plugin modules must be added here, before others or we can get crazy injection errors...
        for (Module pluginModule : pluginsService.nodeModules()) {
            modules.add(pluginModule);
        }
        modules.add(new PluginsModule(pluginsService));
        modules.add(new SettingsModule(this.settings));
        modules.add(new NetworkModule(namedWriteableRegistry));
        modules.add(new ClusterNameModule(this.settings));
        modules.add(new ThreadPoolModule(threadPool));
        modules.add(new TransportModule(this.settings, namedWriteableRegistry));
        modules.add(new SearchModule() {
            @Override
            protected void configure() {
                // noop
            }
        });
        modules.add(new ActionModule(true));
        modules.add(new ClientTransportModule());
        modules.add(new CircuitBreakerModule(this.settings));
 
        pluginsService.processModules(modules);
 
        Injector injector = modules.createInjector();
        //利用Guice创建TransportService
        final TransportService transportService = injector.getInstance(TransportService.class);
        transportService.start(); //启动transportService,放到下面讲,相当于调用TransportService.doStart()
        transportService.acceptIncomingRequests(); //放开阻塞请求的CountDownLatch
 
        TransportClient transportClient = new TransportClient(injector); //实例化Client返回
        success = true;
        return transportClient;
    } finally {
        if (!success) {
            ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
        }
    }
}
  
TransportService.class {
    protected void doStart() {
        ...
        transport.start();  //在这里transport=NettyTransport,所以我们接着看NettyTransport启动做了什么
        ...
    }
}
  
NettyTransport.class {
    protected void doStart() {
        boolean success = false;
        try {
            clientBootstrap = createClientBootstrap(); //创建Netty的客户端启动器
            if (settings.getAsBoolean("network.server", true)) { //客户端同样是Netty的客户端,可以从上文看到network.server=false
                ...
                ...
            }
            success = true;
        } finally {
            if (success == false) {
                doStop();
            }
        }
    }
  
    private ClientBootstrap createClientBootstrap() {
        if (blockingClient) {
            clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings,                 TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX))));
        } else { //默认都是NIO模式
            int bossCount = settings.getAsInt("transport.netty.boss_count", 1);
            clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
                Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)),
                bossCount,
                new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)), workerCount),
                new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer"))));
            ... //Netty的一些启动配置
            return clientBootstrap; //到这里还没绑定绑定本地端口
        }
    }
}
//最后是实例化TransportClient
TransportClient.class {
    private TransportClient(Injector injector) {
        super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class), injector.getInstance(Headers.class));
        this.injector = injector;
        nodesService = injector.getInstance(TransportClientNodesService.class);
        proxy = injector.getInstance(TransportProxyClient.class);
    }
}

添加节点

从前面可以看到我们通过TransportClient.addTransportAddress(TransportAddress transportAddress)来实现的,具体深入看看细节点:

TransportClient.class {
    private final TransportClientNodesService nodesService;
    private final TransportProxyClient proxy;
    public TransportClient addTransportAddress(TransportAddress transportAddress) {
        nodesService.addTransportAddresses(transportAddress);
        return this;
    }
}
  
//再往下看TransportClientNodesService的实现
TransportClientNodesService.class {
    public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) {
        synchronized (mutex) {
            List<TransportAddress> filtered = new ArrayList<>(transportAddresses.length);
            for (TransportAddress transportAddress : transportAddresses) {
                boolean found = false;
                for (DiscoveryNode otherNode : listedNodes) {
                    if (otherNode.address().equals(transportAddress)) {
                        found = true;
                        logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);
                        break;
                    }
                }
                if (!found) {
                    filtered.add(transportAddress);
                }
            }
            if (filtered.isEmpty()) {
                return this;
            }
            List<DiscoveryNode> builder = new ArrayList<>();
            builder.addAll(listedNodes());
            for (TransportAddress transportAddress : filtered) {
                DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress, minCompatibilityVersion);
                logger.debug("adding address [{}]", node);
                builder.add(node);
            }
            listedNodes = Collections.unmodifiableList(builder);
            //上面仅仅只是简单的添加初始化的集群节点而已,随后客户端开始嗅探集群中的其他节点
            nodesSampler.sample();
        }
        return this;
    }
}
  
//接着看看nodesSampler.sample()的实现,调用了SniffNodesSampler.doSample()
class SniffNodesSampler extends NodeSampler {
 
    @Override
    protected void doSample() {
        //listedNodes表示初始化时用户手动指定的节点
        //nodes表示上一轮更新的集群节点
        Set<DiscoveryNode> nodesToPing = Sets.newHashSet();
        for (DiscoveryNode node : listedNodes) {
            nodesToPing.add(node);
        }
        for (DiscoveryNode node : nodes) {
            nodesToPing.add(node);
        }
 
        final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
        final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();
        for (final DiscoveryNode listedNode : nodesToPing) {
            threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        if (!transportService.nodeConnected(listedNode)) { //如果还没有与节点建立连接
                            try {
                                if (nodes.contains(listedNode)) {
                                    logger.trace("connecting to cluster node [{}]", listedNode);
                                    transportService.connectToNode(listedNode); //建立五组连接,五种请求类型的连接
                                } else {
                                    logger.trace("connecting to listed node (light) [{}]", listedNode);
                                    transportService.connectToNodeLight(listedNode); //作为轻连接,仅仅建立一个连接,所有请求类型都用这个连接
                                }
                            } catch (Exception e) {
                                logger.debug("failed to connect to node [{}], ignoring...", e, listedNode);
                                latch.countDown();
                                return;
                            }
                        }
                        //向每个节点请求集群节点信息,在这里特意说一点:ES节点在收到集群状态信息的请求都会统一再转给Master节点,只有Master才能修改和发布集群状态信息
                        transportService.sendRequest(listedNode, ClusterStateAction.NAME,
                                headers.applyTo(Requests.clusterStateRequest().clear().nodes(true).local(true)),
                                TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
                                new BaseTransportResponseHandler<ClusterStateResponse>() {
 
                                    @Override
                                    public ClusterStateResponse newInstance() {
                                        return new ClusterStateResponse();
                                    }
 
                                    @Override
                                    public String executor() {
                                        return ThreadPool.Names.SAME;
                                    }
 
                                    @Override
                                    public void handleResponse(ClusterStateResponse response) {
                                        clusterStateResponses.put(listedNode, response);
                                        latch.countDown();
                                    }
 
                                    @Override
                                    public void handleException(TransportException e) {
                                        logger.info("failed to get local cluster state for {}, disconnecting...", e, listedNode);
                                        transportService.disconnectFromNode(listedNode);
                                        latch.countDown();
                                    }
                                });
                    } catch (Throwable e) {
                        logger.info("failed to get local cluster state info for {}, disconnecting...", e, listedNode);
                        transportService.disconnectFromNode(listedNode);
                        latch.countDown();
                    }
                }
            });
        }
 
        try {
            latch.await();
        } catch (InterruptedException e) {
            return;
        }
 
        HashSet<DiscoveryNode> newNodes = new HashSet<>(); //新的集群所有节点信息
        HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>(); //非本ES集群的节点
        for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {
            if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {
                logger.warn("node {} not part of the cluster {}, ignoring...", entry.getValue().getState().nodes().localNode(), clusterName);
                newFilteredNodes.add(entry.getKey());
                continue;
            }
            for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().dataNodes().values()) {
                newNodes.add(cursor.value);
            }
        }
 
        nodes = validateNewNodes(newNodes); //校验新的集群节点,对于没有建立过连接的节点建立连接,失败就移除
        filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
        //到此我们就已经完成在初始化添加节点的时候完成嗅探集群中的其他节点
    }
  
    //TransportClientNodesService在实例化的时候也启动了一个定时任务,用来定时更新本地的集群节点信息,默认是5s一次
    class ScheduledNodeSampler implements Runnable {
        @Override
        public void run() {
            try {
                nodesSampler.sample(); //调用上文的SniffNodesSampler.doSample()
            } catch (Exception e) {
                logger.warn("failed to sample", e);
            }
        }
    }
}

再来具体说说与节点建立连接的时候,区分"轻"与"重"的连接方式:

protected NodeChannels connectToChannelsLight(DiscoveryNode node) {
    InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
    ChannelFuture connect = clientBootstrap.connect(address);
    connect.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
    if (!connect.isSuccess()) {
        throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connect.getCause());
    }
    Channel[] channels = new Channel[1];
    channels[0] = connect.getChannel();
    channels[0].getCloseFuture().addListener(new ChannelCloseListener(node));
    return new NodeChannels(channels, channels, channels, channels, channels);
}
protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
    //建立五组不同请求类型的连接
    ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length]; //size=2
    ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length]; //size=3
    ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length]; //size=6
    ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length]; //size=1
    ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length]; //size=1
    InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
    for (int i = 0; i < connectRecovery.length; i++) {
        connectRecovery[i] = clientBootstrap.connect(address);
    }
    for (int i = 0; i < connectBulk.length; i++) {
        connectBulk[i] = clientBootstrap.connect(address);
    }
    for (int i = 0; i < connectReg.length; i++) {
        connectReg[i] = clientBootstrap.connect(address);
    }
    for (int i = 0; i < connectState.length; i++) {
        connectState[i] = clientBootstrap.connect(address);
    }
    for (int i = 0; i < connectPing.length; i++) {
        connectPing[i] = clientBootstrap.connect(address);
    }
 
    ...
}

负载均衡

通过轮询(Round Robbin)的方式从"重连接"的节点列表里发送请求,代码如下:

private final AtomicInteger randomNodeGenerator = new AtomicInteger();
  
public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) {
    List<DiscoveryNode> nodes = this.nodes;
    int index = getNodeNumber();
    DiscoveryNode node = nodes.get((index) % nodes.size());
    ...
}
  
private int getNodeNumber() {
    int index = randomNodeGenerator.incrementAndGet();
    if (index < 0) {
        index = 0;
        randomNodeGenerator.set(0);
    }
    return index;
}

总结

到这,我们大概梳理了一遍ES客户端的启动过程,文中如果有写的不对的地方还请指正

上一篇下一篇

猜你喜欢

热点阅读