Java World玩转大数据大数据,机器学习,人工智能

Kafka的Request和Response

2016-12-30  本文已影响849人  扫帚的影子

基础数据结构类:

Type类:

public abstract void write(ByteBuffer buffer, Object o);
public abstract Object read(ByteBuffer buffer);
public abstract Object validate(Object o);
public abstract int sizeOf(Object o);
public boolean isNullable();
public static final Type INT8
public static final Type INT16
public static final Type INT32
public static final Type INT64
public static final Type STRING
public static final Type BYTES
public static final Type NULLABLE_BYTES

ArrayOf类:

Field类:

    final int index;
    public final String name;
    public final Type type;
    public final Object defaultValue;
    public final String doc;
    final Schema schema;

Schema类:

Sturct类:

    private final Schema schema;
    private final Object[] values;

协议相关类型:

Protocol类:

public static final Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
                                                           new Field("api_version", INT16, "The version of the API."),
                                                           new Field("correlation_id",
                                                                     INT32,
                                                                     "A user-supplied integer value that will be passed back with the response"),
                                                           new Field("client_id",
                                                                     STRING,
                                                                     "A user specified identifier for the client making the request."));
...

ApiKeys类:

    PRODUCE(0, "Produce"),
    FETCH(1, "Fetch"),
    LIST_OFFSETS(2, "Offsets"),
    METADATA(3, "Metadata"),
    LEADER_AND_ISR(4, "LeaderAndIsr"),
    STOP_REPLICA(5, "StopReplica"),
    UPDATE_METADATA_KEY(6, "UpdateMetadata"),
    CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
    OFFSET_COMMIT(8, "OffsetCommit"),
    OFFSET_FETCH(9, "OffsetFetch"),
    GROUP_COORDINATOR(10, "GroupCoordinator"),
    JOIN_GROUP(11, "JoinGroup"),
    HEARTBEAT(12, "Heartbeat"),
    LEAVE_GROUP(13, "LeaveGroup"),
    SYNC_GROUP(14, "SyncGroup"),
    DESCRIBE_GROUPS(15, "DescribeGroups"),
    LIST_GROUPS(16, "ListGroups");

Request和Response相关类型

每个Request和Response都由RequestHeader(ResponseHeader) + 具体的消费体构成;

AbstractRequestResponse类:

public int sizeOf()
public void writeTo(ByteBuffer buffer)
public String toString()
public int hashCode()
public boolean equals(Object obj)

AbstractRequest类:

public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffer buffer) {
        switch (ApiKeys.forId(requestId)) {
            case PRODUCE:
                return ProduceRequest.parse(buffer, versionId);
            case FETCH:
                return FetchRequest.parse(buffer, versionId);
            case LIST_OFFSETS:
                return ListOffsetRequest.parse(buffer, versionId);
            case METADATA:
                return MetadataRequest.parse(buffer, versionId);
            case OFFSET_COMMIT:
                return OffsetCommitRequest.parse(buffer, versionId);
            case OFFSET_FETCH:
                return OffsetFetchRequest.parse(buffer, versionId);
            case GROUP_COORDINATOR:
                return GroupCoordinatorRequest.parse(buffer, versionId);
            case JOIN_GROUP:
                return JoinGroupRequest.parse(buffer, versionId);
            case HEARTBEAT:
                return HeartbeatRequest.parse(buffer, versionId);
            case LEAVE_GROUP:
                return LeaveGroupRequest.parse(buffer, versionId);
            case SYNC_GROUP:
                return SyncGroupRequest.parse(buffer, versionId);
            case STOP_REPLICA:
                return StopReplicaRequest.parse(buffer, versionId);
            case CONTROLLED_SHUTDOWN_KEY:
                return ControlledShutdownRequest.parse(buffer, versionId);
            case UPDATE_METADATA_KEY:
                return UpdateMetadataRequest.parse(buffer, versionId);
            case LEADER_AND_ISR:
                return LeaderAndIsrRequest.parse(buffer, versionId);
            case DESCRIBE_GROUPS:
                return DescribeGroupsRequest.parse(buffer, versionId);
            case LIST_GROUPS:
                return ListGroupsRequest.parse(buffer, versionId);
            default:
                return null;
        }
    }

实现上是调用各个具体Request对象的parse方法根据bytebuffer和versionid来产生具体的Request对象;

ProduceRequest类:

    private final short acks;
    private final int timeout;
    private final Map<TopicPartition, ByteBuffer> partitionRecords;
public ProduceRequest(Struct struct) {
        super(struct);
        partitionRecords = new HashMap<TopicPartition, ByteBuffer>();
        for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
            Struct topicData = (Struct) topicDataObj;
            String topic = topicData.getString(TOPIC_KEY_NAME);
            for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) {
                Struct partitionResponse = (Struct) partitionResponseObj;
                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
                ByteBuffer records = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
                partitionRecords.put(new TopicPartition(topic, partition), records);
            }
        }
        acks = struct.getShort(ACKS_KEY_NAME);
        timeout = struct.getInt(TIMEOUT_KEY_NAME);
    }

RequestHeader类:

    private static final Field API_KEY_FIELD = REQUEST_HEADER.get("api_key");
    private static final Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version");
    private static final Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id");
    private static final Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id");

关系图:

request_response.png

实际上在 core/src/main/scala/kafka/api下也定义了各种Request和Response:

NOTE: this map only includes the server-side request/response handlers. Newer
request types should only use the client-side versions which are parsed with
o.a.k.common.requests.AbstractRequest.getRequest()

     val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) =>       
     RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
        FetchKey -> ("Fetch", FetchRequest.readFrom),
        OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
        MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
        LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
        StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom),
        UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom),
        ControlledShutdownKey -> ("ControlledShutdown",   
        ControlledShutdownRequest.readFrom),
        OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
        OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom)
def readFrom(buffer: ByteBuffer): ProducerRequest = {
    val versionId: Short = buffer.getShort
    val correlationId: Int = buffer.getInt
    val clientId: String = readShortString(buffer)
    val requiredAcks: Short = buffer.getShort
    val ackTimeoutMs: Int = buffer.getInt
    //build the topic structure
    val topicCount = buffer.getInt
    val partitionDataPairs = (1 to topicCount).flatMap(_ => {
      // process topic
      val topic = readShortString(buffer)
      val partitionCount = buffer.getInt
      (1 to partitionCount).map(_ => {
        val partition = buffer.getInt
        val messageSetSize = buffer.getInt
        val messageSetBuffer = new Array[Byte](messageSetSize)
        buffer.get(messageSetBuffer,0,messageSetSize)
        (TopicAndPartition(topic, partition), new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))
      })
    })

请求生成与保存

Kafka协议官网地址

下一篇Kafka初始化流程与请求处理

Kafka源码分析-汇总
上一篇下一篇

猜你喜欢

热点阅读