玩转大数据程序员@IT·互联网

Kafka的消息是如何被消费的?

2017-09-19  本文已影响2735人  扫帚的影子

GroupMetadata类
GroupMetadataManager类
def getGroup(groupId: String): GroupMetadata
def addGroup(group: GroupMetadata): GroupMetadata
def removeGroup(group: GroupMetadata)
GroupCoordinator类
       case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
       case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
       case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
       case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
       case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
       case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
       case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
       case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)
       case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)
c2.jpg
  1. 第二种情况: c1和c2已经在group中, 然后c1正常的退出离开
c1.jpg
  1. 第二种情况: c1和c2已经在group中, 然后c1非正常退出,比如说进程被kill掉
    流程跟上面的2基本上一致, 只不过(1)这步的触发条件不是LeaveGroupRequest, 而是来自c1的heartbeat的onExpireHeartbeat;
  2. 第四种情况: c1和c2已经在group中, 然后这个topic的partition增加, 这个时候服务端是无法主动触发的,客户端会定时去服务端同步metadata信息, 从新的metadata信息中客户端会获知partition有了变化, 此时c1和c2会重新发送JoinRequest来触发新的balance;
  3. 还有其它的两种情况, 这里就不一一说明了,总之就是利用这个状态机的转换来作相应的处理.

Kafka源码分析-汇总

上一篇 下一篇

猜你喜欢

热点阅读