Laboratory程序员消息队列MQ(Kafka&RabbitMQ)

ActiveMQ源码解析(四)Network Bridge

2017-10-02  本文已影响58人  MisterCH
ActiveMQ集群网络

在多个ActiveMQ Broker组成的集群(Network of broker)中,是通过Network Bridge机制来实现的。在阅读源码前,我一直有误解,以为AMQ之间在duplex=false的场景下互相通讯是互相建立消费者客户端,如果生产者连接Broker A,而消费者连接Broker B,那么应该是Broker B先从Broker A上消费消息再发给自己的消费者。可是实际看起来并不是这种简单的机制。今天这篇源码解析,由于牵涉的代码比较多,我尽量用较少的源码来整理出每个方法的逻辑。

AMQ的通讯的确是通过一个虚拟的客户端实现的,但是这个客户端在AMQ的源码里称为Network Bridge。在集群建立的时候,如果使用duplex=false(一般在mesh结构,也就是N个Broker完全互连的场景下)的配置,每个Broker上都会出现两个连接:

这里所谓的指令指的是例如消费者创建,Broker异常,连接异常等控制指令,Network Bridge接收到这些指令后需要进行相应的处理。而消息则是Message,收到后需要进行ack或者转发给自己的消费者。

看看源码吧,在AMQ的源码中起到主要作用(default配置)的是DemandForwardingBridgeSupport类。这个类中有几个主要的方法:

  1. start(),stop(),启动和停止bridge
  2. serviceLocalCommand 处理本地指令和消息
  3. serviceRemoteCommand 处理其他MQ传过来的指令和消息

先来看start()方法吧,主要功能:
1. 判断是否是双向连接,如果是就多建一个监听器
2. 建立local和remote的transportListener监听器
3. 启动network bridge

    public void start() throws Exception {
        if (started.compareAndSet(false, true)) {

            if (brokerService == null) {
                throw new IllegalArgumentException("BrokerService is null on " + this);
            }

            networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
            //如果是双向连接,额外建立一个监听器
            if (isDuplex()) {
                duplexInboundLocalBroker = NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI());
                duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {

                    @Override
                    public void onCommand(Object o) {
                        Command command = (Command) o;
                        serviceLocalCommand(command);
                    }

                    @Override
                    public void onException(IOException error) {
                        serviceLocalException(error);
                    }
                });
                duplexInboundLocalBroker.start();
            }
            // 设定Local监听器,处理本地指令和消息
            localBroker.setTransportListener(new DefaultTransportListener() {

                @Override
                public void onCommand(Object o) {
                    Command command = (Command) o;
                    serviceLocalCommand(command);
                }

                @Override
                public void onException(IOException error) {
                    if (!futureLocalBrokerInfo.isDone()) {
                        LOG.info("error with pending local brokerInfo on: " + localBroker, error);
                        futureLocalBrokerInfo.cancel(true);
                        return;
                    }
                    serviceLocalException(error);
                }
            });
            // 设定Remote监听器,处理别的MQ的指令和消息
            remoteBroker.setTransportListener(new DefaultTransportListener() {

                @Override
                public void onCommand(Object o) {
                    Command command = (Command) o;
                    serviceRemoteCommand(command);
                }

                @Override
                public void onException(IOException error) {
                    if (!futureRemoteBrokerInfo.isDone()) {
                        LOG.info("error with pending remote brokerInfo on: " + remoteBroker, error);
                        futureRemoteBrokerInfo.cancel(true);
                        return;
                    }
                    serviceRemoteException(error);
                }
            });

            // remoteBroker和localBroker的类其实是transport,也就是说这俩货
            // 其实是两个连接。
            remoteBroker.start();
            localBroker.start();
             
            // 如果连接未被抛弃
            if (!disposed.get()) {
                try {
                    // 启动
                    triggerStartAsyncNetworkBridgeCreation();
                } catch (IOException e) {
                    LOG.warn("Caught exception from remote start", e);
                }
            } else {
                LOG.warn("Bridge was disposed before the start() method was fully executed.");
                throw new TransportDisposedIOException();
            }
        }
    }

启动bridge的方法是triggerStartAsyncNetworkBridgeCreation(),在该方法中主要是启动一个做了两件事:
1. 调用collectBrokerInfos()方法收集本地和异地的brokerInfo信息,在该方法中有逻辑来处理remoteBroker和localBroker是相同Broker的情况,防止出现环。
2. 调用doStartLocalAndRemoteBridges()方法启动本地和异地的连接(也就是localBroker和remoteBroker)。在该方法中主要是通过startLocalBridge()和startRemoteBridge()来实现连接的启动。

startLocalBridge和startRemoteBridge()稍有点复杂,源码就不上了。我整理了一下,startLocalBridge主要实现的事情包括了:
1. 往本地MQ建立了一个ClientID为 NetworkConnectorName_RemoteBrokerName_inbound_LocalBrokerNam的连接信息connectionInfo,同时建立会话信息sessionInfo,并告知本地broker,注意:这个连接信息会出现在要被转发的topic或者queue的消费者列表里。
2. 如果是双向连接,还需要再建立一个连接
3. 建立一个network bridge的连接监听器NetworkBridgeListener
4. 告知本地Broker有其他MQ试图建立连接

而startRemoteBridge主要实现的事情包括了:
1. 处理dynamicallyIncludedDestinationsKey和staticallyIncludedDestinationsKey的配置的事宜
2. 建立一个clientID为NetworkName_LocalBrokerName_outbound的连接信息connectionInfo,同时建立会话信息sessionInfo,生产者producerInfo,并告知连接另一端的Broker。
3. 建立一个监听对端Broker(以下称为RB)的所有消费者建立和断开的Advisory Topic系统Topic:ActiveMQ.Advisory.Consumer.> 的消费者信息,并发送给RB。

startRemoteBridge()中第2、3两部分的代码如下:

              // 把之前没关闭连接的先告知RB关闭了。
                if (remoteConnectionInfo != null) {
                    remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
                }
               // 新建一个remoteConnectionInfo
                remoteConnectionInfo = new ConnectionInfo();
                remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
                remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
                remoteConnectionInfo.setUserName(configuration.getUserName());
                remoteConnectionInfo.setPassword(configuration.getPassword());
                remoteBroker.oneway(remoteConnectionInfo);
                // 新建一个sessionInfo,我理解是用于建立生产者
                SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
                remoteBroker.oneway(remoteSessionInfo);
                // 基于建立的session建立生产者
                producerInfo = new ProducerInfo(remoteSessionInfo, 1);
                producerInfo.setResponseRequired(false);
                remoteBroker.oneway(producerInfo);
                // Listen to consumer advisory messages on the remote broker to determine demand.
                // 如果在xml中没有配置staticBridge,就表示是demanForwardBridge,需要建立监听RB的消费者连接和断开的消费者。
                if (!configuration.isStaticBridge()) {
                    // 发现仍然使用remoteSessionInfo来建立消费者。话说我记得好像同一个session里不能同时有生产者和消费者?
                    demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
                    // always dispatch advisory message asynchronously so that
                    // we never block the producer broker if we are slow
                    demandConsumerInfo.setDispatchAsync(true);
                    String advisoryTopic = configuration.getDestinationFilter();
                    if (configuration.isBridgeTempDestinations()) {
                        advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
                    }
                    demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
                    configureConsumerPrefetch(demandConsumerInfo);
                    remoteBroker.oneway(demandConsumerInfo);
                }

注意:代码里所有的remoteBroker.oneway(command)其实都可以视为一个向RB发送指令和消息的动作,oneway表示异步发送,当然还有同步发送,在startLocalBridge方法里有,往本地发送连接信息的时候用到的:
Object resp = localBroker.request(localConnectionInfo);
此外还有异步发送,但是需要接收并处理反馈的方法,在serviceRemoteCommand方法里,用于发送持久化消息的,代码是这样写的:

remoteBroker.asyncRequest(message, new ResponseCallback() {
    @Override
    public void onCompletion(FutureResponse future) {
        try {
          Response response = future.getResult();
          if (response.isException()) {
               ExceptionResponse er = (ExceptionResponse) response;
               serviceLocalException(md, er.getException());
          } else {
               localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
               networkBridgeStatistics.getDequeues().increment();
          }
       } catch (IOException e) {
          serviceLocalException(md, e);
       } finally {
          sub.decrementOutstandingResponses();
       }
  }
});

言归正传,从上面的几个方法里其实可以看到,在bridge启动的时候,建立一个通向本地的连接信息localConnectionInfo和会话信息localSessionInfo,建立了一个通向RB的连接信息remoteConnectionInfo、会话remoteSessionInfo、生产者信息producerInfo、和对advisory topic的消费者信息demandConsumerInfo。这些对象都是Info结尾的,因为这些都是JMS中规范中的概念,netwokr bridge里建立这些是用来迎合JMS规范的。实际这些都是抽象的模型。我们只需要记住两个连接已经建立了,分别是localBroker和remoteBroker。

下面是最主要的两个方法serviceLocalCommand和serviceRemoteCommand。由于方法比较长,这里介绍主要的逻辑和重要的代码。

serviceLocalCommand,用于处理localBroker这个连接从本地收到的指令和消息,localBroker中传来的消息主要有几种:

  1. 消息分发,根据RB告知的其存在的消费者,判断是否需要转发,有几种情况:
    a. 双向连接且消息是advisory消息,直接使用remoteBroker.oneway(message);发送
    b. 持久化消息或在Broker端配置了始终使用同步发送,使用remoteBroker.asyncRequest()方式发送,也就是异步需反馈的方法发送。等到对端RB发送了收到消息的回执后,会往本地发送一个消息的ack消息localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
    c. 如果消息本身就是被异步发送过来的,那直接调用一个remoteBroker.oneway(message)方法把消息发送给对端,并且调用一个localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));方法告知本地。
  2. BrokerInfo消息,相当于更新本地BrokerInfo,直接保存到一个对象里即可。
    futureLocalBrokerInfo.set((BrokerInfo) command);
  3. Broker Stop信息,相当于本地broker要关闭了,那就执行一个stop方法。
LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
stop();
  1. ConnectionError连接异常的消息,执行异常处理操作:
ConnectionError ce = (ConnectionError) command;
serviceLocalException(ce.getException());
  1. 还有其他的一些消息,不负责处理:
switch (command.getDataStructureType()) {
case WireFormatInfo.DATA_STRUCTURE_TYPE:
    break;
case BrokerSubscriptionInfo.DATA_STRUCTURE_TYPE:
    break;
default:
    LOG.warn("Unexpected local command: {}", command);
}

serviceRemoteCommand方法,用于处理remoteBroker这个连接从RB收到的指令和消息,localBroker中传来的消息主要有几种:

  1. 消息分发。由于remoteBroker只负责advisory的监听,所以只会处理advisory message。serviceRemoteConsumerAdvisory这个方法会根据收到的consumer建立,销毁等动作,将RB的消费者信息加入或移除出本地的network consumer列表。
safeWaitUntilStarted();
MessageDispatch md = (MessageDispatch) command;
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
ackAdvisory(md.getMessage());
  1. BrokerInfo消息,相当于更新异地BrokerInfo,直接保存到一个对象里即可。
    futureRemoteBrokerInfo.set((BrokerInfo) command);
  2. BrokerSubscriptionInfo,处理dynamicallyIncludedDestinations等特殊性质的订阅。
  3. ConnectionError连接异常的消息,执行异常处理操作
  4. 双向连接发来的信息,需要单独处理一遍,不赘述
  5. 其他类型消息,均不作处理,包括KeepAliveInfo,WireFormatInfo,ShutdownInfo等等。

在serviceRemoteCommand方法中,有个很重要的方法是serviceRemoteConsumerAdvisory,该方法用于处理收到的advisoryMessage,主要是消费者的建立和断开,主要是关于networkTTL的判断,避免出现环路。还有对非持久化订阅和持久化订阅的分别处理。代码奉上:

    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
        final int networkTTL = configuration.getConsumerTTL();
        // 收到消费者信息,判断是否需要建一个新的本地network订阅。
        if (data.getClass() == ConsumerInfo.class) {
            // Create a new local subscription
            ConsumerInfo info = (ConsumerInfo) data;
            BrokerId[] path = info.getBrokerPath();

            if (info.isBrowser()) {
                LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
                return;
            }

            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
                LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
                        configuration.getBrokerName(), remoteBrokerName, networkTTL, info
                });
                return;
            }

            if (contains(path, localBrokerPath[0])) {
                // Ignore this consumer as it's a consumer we locally sent to the broker.
                LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
                        configuration.getBrokerName(), remoteBrokerName, info
                });
                return;
            }

            if (!isPermissableDestination(info.getDestination())) {
                // ignore if not in the permitted or in the excluded list
                LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
                        configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
                });
                return;
            }

            // in a cyclic network there can be multiple bridges per broker that can propagate
            // a network subscription so there is a need to synchronize on a shared entity
            // if duplicate suppression is required
            if (isDuplicateSuppressionOff(info)) {
                addConsumerInfo(info);
            } else {
                synchronized (brokerService.getVmConnectorURI()) {
                    addConsumerInfo(info);
                }
            }
        } else if (data.getClass() == DestinationInfo.class) {
            // It's a destination info - we want to pass up information about temporary destinations  说实话没太明白这个是干嘛的
            final DestinationInfo destInfo = (DestinationInfo) data;
            BrokerId[] path = destInfo.getBrokerPath();
            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
                LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
                        configuration.getBrokerName(), destInfo, networkTTL
                });
                return;
            }
            if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
                LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
                return;
            }
            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
            if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
                // re-set connection id so comes from here
                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
            }
            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
            LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
                    configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
            });
            if (destInfo.isRemoveOperation()) {
                // Serialize with removeSub operations such that all removeSub advisories
                // are generated
                serialExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            localBroker.oneway(destInfo);
                        } catch (IOException e) {
                            LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
                        }
                    }
                });
            } else {
                localBroker.oneway(destInfo);
            }
        } else if (data.getClass() == RemoveInfo.class) {
            // 如果是消费断开连接的信息,则判断是否从subscription中移除
            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
            removeDemandSubscription(id);

            if (forcedDurableRemoteId.remove(id)) {
                for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
                    DemandSubscription ds = i.next();
                    boolean removed = ds.removeForcedDurableConsumer(id);
                    if (removed) {
                        cleanupDurableSub(ds, i);
                    }
                }
           }

        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
            RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
            SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
                DemandSubscription ds = i.next();
                boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
                if (removed) {
                    cleanupDurableSub(ds, i);
                }
            }
        }
    }

至此我们大致可以明白MQ的连接之间的套路了。两台MQ之间建立连接,Network Bridge会出现两个transport,一个是localBroker,一个是remoteBroker。一个负责从本地接收,另一个负责监听对端,并处理对端发来的执行。

如果是一个mesh的配置,3台MQ组成的集群,每台MQ上会出现两个localBroker,两个remoteBroker。4台MQ则各是3个共6个……

感觉mesh方式来搭建MQ的集群,对MQ的集群压力还是挺大的啊。。

上一篇 下一篇

猜你喜欢

热点阅读