Kafka源码分析-Consumer(4)-Subscripti

2018-11-24  本文已影响0人  陈阳001

KafkaConsumer从Kafka拉取消息时发送的请求时FetchRequest,其中需要指定消费者希望拉取的起始消息的offset。为了消费者快速获取这个值,KafkaConsumer使用SubscriptionState来追踪TopicPartition和offset对应关系。


image.png image.png image.png

SubscriptionType是SubscriptionState的一个内部枚举类型,表示订阅Topic的模式:

/**
     * This method sets the subscription type if it is not already set (i.e. when it is NONE),
     * or verifies that the subscription type is equal to the give type when it is set (i.e.
     * when it is not NONE)
     * @param type The given subscription type
     */
    private void setSubscriptionType(SubscriptionType type) {
        //如果是NONE,则可以指定其他模式
        if (this.subscriptionType == SubscriptionType.NONE)
            this.subscriptionType = type;
        else if (this.subscriptionType != type)//如果已经指定了其他模式,就会报错
            throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
    }

SubscriptionState核心字段:

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
        
        //用户未指定ConsumerRebalanceListener时,默认使用NoOpConsumerRebalanceListener,但是
        //所有的方法都是空的。
        if (listener == null)
            throw new IllegalArgumentException("RebalanceListener cannot be null");
      setSubscriptionType(SubscriptionType.AUTO_TOPICS);//选择AUTO_TOPICS模式

        this.listener = listener;

        changeSubscription(topics);
    }


public void changeSubscription(Collection<String> topicsToSubscribe) {
        //如果订阅的Topic发生了变化
        if (!this.subscription.equals(new HashSet<>(topicsToSubscribe))) {
            this.subscription.clear();//情况subscription集合
            this.subscription.addAll(topicsToSubscribe);//添加订阅的Topic
            this.groupSubscription.addAll(topicsToSubscribe);
            this.needsPartitionAssignment = true;//标记需要重新分配分区

            // Remove any assigned partitions which are no longer subscribed to
            for (Iterator<TopicPartition> it = assignment.keySet().iterator(); it.hasNext(); ) {
                TopicPartition tp = it.next();
                if (!subscription.contains(tp.topic()))
                    it.remove();
            }
        }
    }
上一篇下一篇

猜你喜欢

热点阅读