rocketMq理论与实践

RocketMq DefaultLitePullConsumer

2020-07-05  本文已影响0人  晴天哥_王志

开篇

DefaultLitePullConsumer拉取流程

DefaultLitePullConsumer消息拉取

消息拉取例子

public class LitePullConsumerSubscribe {

    public static volatile boolean running = true;

    public static void main(String[] args) throws Exception {
        // 1、创建DefaultLitePullConsumer对象
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumer");
        // 2、设置namesrv地址
        litePullConsumer.setNamesrvAddr("localhost:9876");
        // 3、订阅消费主题
        litePullConsumer.subscribe("TopicTest", "*");
        // 4、启动消费对象
        litePullConsumer.start();
        try {
            // 5、循环开始消费消息
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s%n", messageExts);
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
}

创建消费者

DefaultLitePullConsumer

public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
    // 消息拉取的实现DefaultLitePullConsumerImpl
    private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
    private String consumerGroup;
    private long brokerSuspendMaxTimeMillis = 1000 * 20;
    private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
    private long consumerPullTimeoutMillis = 1000 * 10;
    private MessageModel messageModel = MessageModel.CLUSTERING;
    // 消费队列回调监听函数MessageQueueListener
    private MessageQueueListener messageQueueListener;
    private OffsetStore offsetStore;
    // MessageQueue的分配选择器
    private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
    private boolean unitMode = false;
    private boolean autoCommit = true;
    private int pullThreadNums = 20;
    private long autoCommitIntervalMillis = 5 * 1000;
    private int pullBatchSize = 10;
    private long pullThresholdForAll = 10000;
    private int consumeMaxSpan = 2000;
    private int pullThresholdForQueue = 1000;
    private int pullThresholdSizeForQueue = 100;
    private long pollTimeoutMillis = 1000 * 5;
    private long topicMetadataCheckIntervalMillis = 30 * 1000;

    public DefaultLitePullConsumer(final String consumerGroup) {
        this(null, consumerGroup, null);
    }

    public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
        this.namespace = namespace;
        this.consumerGroup = consumerGroup;
        // 创建DefaultLitePullConsumerImpl对象
        defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
    }
}

DefaultLitePullConsumerImpl

public class DefaultLitePullConsumerImpl implements MQConsumerInner {

    private final InternalLogger log = ClientLogger.getLog();
    private final long consumerStartTimestamp = System.currentTimeMillis();
    private final RPCHook rpcHook;
    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
    // 核心的消息消费的MQClientInstance对象
    protected MQClientInstance mQClientFactory;
    private PullAPIWrapper pullAPIWrapper;
    private OffsetStore offsetStore;
    // 消费队列的负载均衡
    private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);

    private enum SubscriptionType {
        NONE, SUBSCRIBE, ASSIGN
    }

    private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first.";
    private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive.";
    private SubscriptionType subscriptionType = SubscriptionType.NONE;
    private long pullTimeDelayMillsWhenException = 1000;
    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
    private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
    private DefaultLitePullConsumer defaultLitePullConsumer;
    // 消费任务的数据结构
    private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
        new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
    // 分配的消息队列
    private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
    // 消费消息的数据结构
    private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
    private final ScheduledExecutorService scheduledExecutorService;
    private Map<String, TopicMessageQueueChangeListener> topicMessageQueueChangeListenerMap = new HashMap<String, TopicMessageQueueChangeListener>();
    private Map<String, Set<MessageQueue>> messageQueuesForTopic = new HashMap<String, Set<MessageQueue>>();
    private long consumeRequestFlowControlTimes = 0L;
    private long queueFlowControlTimes = 0L;
    private long queueMaxSpanFlowControlTimes = 0L;
    private long nextAutoCommitDeadline = -1L;
    private final MessageQueueLock messageQueueLock = new MessageQueueLock();

    public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
        this.defaultLitePullConsumer = defaultLitePullConsumer;
        this.rpcHook = rpcHook;

        this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
            this.defaultLitePullConsumer.getPullThreadNums(),
            new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
        );

        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "MonitorMessageQueueChangeThread");
            }
        });
        this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
    }
}

订阅消费主题

DefaultLitePullConsumerImpl

public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {

    @Override
    public void subscribe(String topic, String subExpression) throws MQClientException {
        this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
    }
}


public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    // 消费队列的负载均衡
    private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);

    private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
        new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
    // assignedMessageQueue保存负责的MessageQueue对象
    private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();

    private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();

    public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            if (topic == null || topic.equals("")) {
                throw new IllegalArgumentException("Topic can not be null or empty.");
            }
            // 1、创建订阅的数据结构对象
            setSubscriptionType(SubscriptionType.SUBSCRIBE);
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
                topic, subExpression);
            // 2、负责均衡保存订阅数据对象
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            // 3、注册消息队列回调处理函数
            this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
            // 4、assignedMessageQueue保存rebalanceImpl
            assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);

            if (serviceState == ServiceState.RUNNING) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                updateTopicSubscribeInfoWhenSubscriptionChanged();
            }
        } catch (Exception e) {
            throw new MQClientException("subscribe exception", e);
        }
    }
}

启动消息拉取消费者

DefaultLitePullConsumerImpl

public class DefaultLitePullConsumerImpl implements MQConsumerInner {

    protected MQClientInstance mQClientFactory;

    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultLitePullConsumer.changeInstanceNameToPID();
                }
                // 初始化Client端负责消息拉取
                initMQClientFactory();
                // 初始化MessageQueue的负载均衡器
                initRebalanceImpl();
                initPullAPIWrapper();
                // 初始化消费位移保存
                initOffsetStore();
                // 核心启动MQClientInstance
                mQClientFactory.start();
                startScheduleTask();
                this.serviceState = ServiceState.RUNNING;
                operateAfterRunning();

                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
    }

    private void initMQClientFactory() throws MQClientException {
        // 创建MQClientInstance对象
        this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);

        boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
    }
}


public class MQClientInstance {

    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();

    public boolean registerConsumer(final String group, final MQConsumerInner consumer) {
        if (null == group || null == consumer) {
            return false;
        }

        MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
        if (prev != null) {
            log.warn("the consumer group[" + group + "] exist already.");
            return false;
        }

        return true;
    }
}

MQClientInstance

public class MQClientInstance {

    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
        this.clientConfig = clientConfig;
        this.instanceIndex = instanceIndex;
        this.nettyClientConfig = new NettyClientConfig();
        this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
        this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
        this.clientRemotingProcessor = new ClientRemotingProcessor(this);
        this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);

        if (this.clientConfig.getNamesrvAddr() != null) {
            this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
        }

        this.clientId = clientId;
        this.mQAdminImpl = new MQAdminImpl(this);
        this.pullMessageService = new PullMessageService(this);
        this.rebalanceService = new RebalanceService(this);
        this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
        this.defaultMQProducer.resetClientConfig(clientConfig);
        this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
    }

    public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }
}

RebalanceService

public class RebalanceService extends ServiceThread {

    private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
    private final InternalLogger log = ClientLogger.getLog();
    private final MQClientInstance mqClientFactory;

    public RebalanceService(MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }

    @Override
    public void run() {
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
    }
}


public class MQClientInstance {

    public void doRebalance() {
        // consumerTable保存key为LitePullConsumer的DefaultLitePullConsumerImpl对象
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {
                    impl.doRebalance();
                } catch (Throwable e) {
                }
            }
        }
    }
}

RebalanceImpl

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    // RebalanceLitePullImpl extends RebalanceImpl 
    private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);

    public void doRebalance() {
        if (this.rebalanceImpl != null) {
            this.rebalanceImpl.doRebalance(false);
        }
    }
}


public abstract class RebalanceImpl {

    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
    protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
        new ConcurrentHashMap<String, Set<MessageQueue>>();
    protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
        new ConcurrentHashMap<String, SubscriptionData>();

    public void doRebalance(final boolean isOrder) {
        // 获取consumer
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        this.truncateMessageQueueNotMyTopic();
    }


    public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
        return subscriptionInner;
    }


    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}",
                            consumerGroup,
                            topic,
                            mqSet,
                            mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            case CLUSTERING: {

                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }
}


public class RebalanceLitePullImpl extends RebalanceImpl {

    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
        if (messageQueueListener != null) {
            try {
                messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
            } catch (Throwable e) {
                log.error("messageQueueChanged exception", e);
            }
        }
    }
}

MessageQueueListenerImpl

public class DefaultLitePullConsumerImpl implements MQConsumerInner {

    private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();

    class MessageQueueListenerImpl implements MessageQueueListener {
        @Override
        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
            MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
            switch (messageModel) {
                case BROADCASTING: // 广播模式
                    updateAssignedMessageQueue(topic, mqAll);
                    updatePullTask(topic, mqAll);
                    break;
                case CLUSTERING: // 集群模式
                    updateAssignedMessageQueue(topic, mqDivided);
                    updatePullTask(topic, mqDivided);
                    break;
                default:
                    break;
            }
        }
    }

    private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
        // assignedMessageQueue负责保存topic和对应分配的MessageQueue
        this.assignedMessageQueue.updateAssignedMessageQueue(topic, assignedMessageQueue);
    }

    private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
        Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
        while (it.hasNext()) {
            // 移除不负责的消息拉取任务PullTask
            Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
            if (next.getKey().getTopic().equals(topic)) {
                if (!mqNewSet.contains(next.getKey())) {
                    next.getValue().setCancelled(true);
                    it.remove();
                }
            }
        }
        // 启动MessageQueue对应的拉取任务
        startPullTask(mqNewSet);
    }

    private void startPullTask(Collection<MessageQueue> mqSet) {
        for (MessageQueue messageQueue : mqSet) {
            if (!this.taskTable.containsKey(messageQueue)) {
                PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
                this.taskTable.put(messageQueue, pullTask);
                this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
            }
        }
    }
}

DefaultLitePullConsumerImpl

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    
    private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();

    public class PullTaskImpl implements Runnable {
        private final MessageQueue messageQueue;
        private volatile boolean cancelled = false;

        public PullTaskImpl(final MessageQueue messageQueue) {
            this.messageQueue = messageQueue;
        }

        @Override
        public void run() {

            if (!this.isCancelled()) {
                // 从assignedMessageQueue获取ProcessQueue对象
                ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);

                long offset = nextPullOffset(messageQueue);
                long pullDelayTimeMills = 0;
                try {
                    SubscriptionData subscriptionData;
                    if (subscriptionType == SubscriptionType.SUBSCRIBE) {
                        String topic = this.messageQueue.getTopic();
                        subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
                    } else {
                        String topic = this.messageQueue.getTopic();
                        subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
                            topic, SubscriptionData.SUB_ALL);
                    }
                    // 执行消息的拉取动作
                    PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchSize());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
                            synchronized (objLock) {
                                if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
                                    // 拉取的消息放置到processQueue当中
                                    processQueue.putMessage(pullResult.getMsgFoundList());
                                    // submitConsumeRequest负责保存待消费的任务
                                    submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
                                }
                            }
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("The pull request offset illegal, {}", pullResult.toString());
                            break;
                        default:
                            break;
                    }
                    // 更新下一次拉取的位移
                    updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
                } catch (Throwable e) {
                    pullDelayTimeMills = pullTimeDelayMillsWhenException;
                }

                // 重新投递消息拉取任务
                if (!this.isCancelled()) {
                    scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
                } else {
                }
            }
        }
    }

    private void submitConsumeRequest(ConsumeRequest consumeRequest) {
        try {
            consumeRequestCache.put(consumeRequest);
        } catch (InterruptedException e) {
            log.error("Submit consumeRequest error", e);
        }
    }


    public class ConsumeRequest {
        private final List<MessageExt> messageExts;
        private final MessageQueue messageQueue;
        private final ProcessQueue processQueue;

        public ConsumeRequest(final List<MessageExt> messageExts, final MessageQueue messageQueue,
            final ProcessQueue processQueue) {
            this.messageExts = messageExts;
            this.messageQueue = messageQueue;
            this.processQueue = processQueue;
        }
    }
}

消息的消费

public class DefaultLitePullConsumerImpl implements MQConsumerInner {

    public synchronized List<MessageExt> poll(long timeout) {
        try {
            checkServiceState();
            if (timeout < 0)
                throw new IllegalArgumentException("Timeout must not be negative");

            if (defaultLitePullConsumer.isAutoCommit()) {
                maybeAutoCommit();
            }
            long endTime = System.currentTimeMillis() + timeout;

            ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);

            if (endTime - System.currentTimeMillis() > 0) {
                while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
                    consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                    if (endTime - System.currentTimeMillis() <= 0)
                        break;
                }
            }

            if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
                List<MessageExt> messages = consumeRequest.getMessageExts();
                long offset = consumeRequest.getProcessQueue().removeMessage(messages);
                assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
                // 设置messages的topic为重试队列
                this.resetTopic(messages);
                return messages;
            }
        } catch (InterruptedException ignore) {

        }

        return Collections.emptyList();
    }
}
上一篇下一篇

猜你喜欢

热点阅读