大数据玩转大数据大数据 爬虫Python AI Sql

Librdkafka的基础数据结构 3 -- Buffer相关

2018-01-12  本文已影响97人  扫帚的影子

struct rd_kafka_buf_s
struct rd_kafka_buf_s { /* rd_kafka_buf_t */
    TAILQ_ENTRY(rd_kafka_buf_s) rkbuf_link; // 这个rd_kafka_buf_s定义为tailq的元素

    int32_t rkbuf_corrid;  // 对应于kafka协议中request header里的CorrelationId

    rd_ts_t rkbuf_ts_retry;    /* Absolute send retry time */ //发送request的重试的绝对时间

    int     rkbuf_flags; /* RD_KAFKA_OP_F */

        rd_buf_t rkbuf_buf;        /**< Send/Recv byte buffer */  // 发送或接收数据的rd_buf_t
        rd_slice_t rkbuf_reader;   /**< Buffer slice reader for rkbuf_buf */ // 上面rd_buf_t的只读映射

    int     rkbuf_connid;      /* broker connection id (used when buffer
                    * was partially sent). */
        size_t  rkbuf_totlen;      /* recv: total expected length,
                                    * send: not used */   // 接收response时, kafka协议的前四个字节表示payload长度, 这个表示payload有多长, 即后面需要再接收的数据长度

    rd_crc32_t rkbuf_crc;      /* Current CRC calculation */ //CRC校验

    struct rd_kafkap_reqhdr rkbuf_reqhdr;   /* Request header.
                                                 * These fields are encoded
                                                 * and written to output buffer
                                                 * on buffer finalization. */
    struct rd_kafkap_reshdr rkbuf_reshdr;   /* Response header.
                                                 * Decoded fields are copied
                                                 * here from the buffer
                                                 * to provide an ease-of-use
                                                 * interface to the header */

    int32_t rkbuf_expected_size;  /* expected size of message */

        // response的入队列
        rd_kafka_replyq_t   rkbuf_replyq;       /* Enqueue response on replyq */
        rd_kafka_replyq_t   rkbuf_orig_replyq;  /* Original replyq to be used
                                                 * for retries from inside
                                                 * the rkbuf_cb() callback
                                                 * since rkbuf_replyq will
                                                 * have been reset. */
        rd_kafka_resp_cb_t *rkbuf_cb;           /* Response callback */
        struct rd_kafka_buf_s *rkbuf_response;  /* Response buffer */

        struct rd_kafka_broker_s *rkbuf_rkb;  // 相关联的broker

    rd_refcnt_t rkbuf_refcnt; // 引用计数
    void   *rkbuf_opaque;

       // 重试次数
    int     rkbuf_retries;            /* Retries so far. */
#define RD_KAFKA_BUF_NO_RETRIES  1000000  /* Do not retry */

        int     rkbuf_features;   /* Required feature(s) that must be
                                   * supported by broker. */

    rd_ts_t rkbuf_ts_enq;
    rd_ts_t rkbuf_ts_sent;    /* Initially: Absolute time of transmission,
                   * after response: RTT. */
    rd_ts_t rkbuf_ts_timeout;

        int64_t rkbuf_offset;     /* Used by OffsetCommit */  // 需要提交的offset

    rd_list_t *rkbuf_rktp_vers;    /* Toppar + Op Version map.
                    * Used by FetchRequest. */

    rd_kafka_msgq_t rkbuf_msgq;

        rd_kafka_resp_err_t rkbuf_err;      /* Buffer parsing error code */

        union {
                struct {
                        rd_list_t *topics;  /* Requested topics (char *) */
                        char *reason;       /* Textual reason */
                        rd_kafka_op_t *rko; /* Originating rko with replyq
                                             * (if any) */
                        int all_topics;     /* Full/All topics requested */

                        int *decr;          /* Decrement this integer by one
                                             * when request is complete:
                                             * typically points to metadata
                                             * cache's full_.._sent.
                                             * Will be performed with
                                             * decr_lock held. */
                        mtx_t *decr_lock;

                } Metadata;
        } rkbuf_u;
};
rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
                                          int segcnt, size_t size) {
        rd_kafka_buf_t *rkbuf;

        /* Make room for common protocol request headers */
        // 计算size 更新, 加上request header大小, 包括client id
        size += RD_KAFKAP_REQHDR_SIZE +
                RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_client_id);

       // rd_buffer_t中的segment个数加上, 包括一个header
        segcnt += 1; /* headers */

       按指定的segment个数和size来创建rd_kafka_buf
        rkbuf = rd_kafka_buf_new0(segcnt, size, 0);

        rkbuf->rkbuf_rkb = rkb;
        rd_kafka_broker_keep(rkb);

        rkbuf->rkbuf_reqhdr.ApiKey = ApiKey;

        // 写kafka request header
        /* Write request header, will be updated later. */
        /* Length: updated later */
        rd_kafka_buf_write_i32(rkbuf, 0);
        /* ApiKey */
        rd_kafka_buf_write_i16(rkbuf, rkbuf->rkbuf_reqhdr.ApiKey);
        /* ApiVersion: updated later */
        rd_kafka_buf_write_i16(rkbuf, 0);
        /* CorrId: updated later */
        rd_kafka_buf_write_i32(rkbuf, 0);

        /* ClientId */
        rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_client_id);

        return rkbuf;
}
static RD_INLINE size_t rd_kafka_buf_write (rd_kafka_buf_t *rkbuf,
                                        const void *data, size_t len) {
        size_t r;

        r = rd_buf_write(&rkbuf->rkbuf_buf, data, len);

        if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)
                rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, data, len);

        return r;
}
static RD_INLINE void rd_kafka_buf_update (rd_kafka_buf_t *rkbuf, size_t of,
                                          const void *data, size_t len) {
        rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
        rd_buf_write_update(&rkbuf->rkbuf_buf, of, data, len);
}
void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
                         int allow_crc_calc, void (*free_cb) (void *)) {
        rd_buf_push(&rkbuf->rkbuf_buf, buf, len, free_cb);

        if (allow_crc_calc && (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC))
                rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, buf, len);
}
int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {

       //先判断是否需要重试
        if (unlikely(!rkb ||
             rkb->rkb_source == RD_KAFKA_INTERNAL ||
             rd_kafka_terminating(rkb->rkb_rk) ||
             rkbuf->rkbuf_retries + 1 >
             rkb->rkb_rk->rk_conf.max_retries))
                return 0;

    /* Try again */
    rkbuf->rkbuf_ts_sent = 0;
    rkbuf->rkbuf_retries++;
    rd_kafka_buf_keep(rkbuf);
        // 加入broker的重试队列里
    rd_kafka_broker_buf_retry(rkb, rkbuf);
    return 1;
}
void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
        rd_kafka_buf_t *request, *response;

        request = rko->rko_u.xbuf.rkbuf;
        rko->rko_u.xbuf.rkbuf = NULL;

        /* NULL on op_destroy() */
    if (request->rkbuf_replyq.q) {
        int32_t version = request->rkbuf_replyq.version;
                /* Current queue usage is done, but retain original replyq for
                 * future retries, stealing
                 * the current reference. */
                request->rkbuf_orig_replyq = request->rkbuf_replyq;
                rd_kafka_replyq_clear(&request->rkbuf_replyq);
        /* Callback might need to version check so we retain the
         * version across the clear() call which clears it. */
        request->rkbuf_replyq.version = version;
    }

    if (!request->rkbuf_cb) {
        rd_kafka_buf_destroy(request);
        return;
    }

        /* Let buf_callback() do destroy()s */
        response = request->rkbuf_response; /* May be NULL */
        request->rkbuf_response = NULL;

       // 获取到reqeust和response后触发回调
        rd_kafka_buf_callback(request->rkbuf_rkb->rkb_rk,
                  request->rkbuf_rkb, err,
                              response, request);
}
struct rd_kafka_bufq_t
typedef struct rd_kafka_bufq_s {
    TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs;
    rd_atomic32_t  rkbq_cnt;
    rd_atomic32_t  rkbq_msg_cnt;
} rd_kafka_bufq_t;

Librdkafka源码分析-Content Table

上一篇下一篇

猜你喜欢

热点阅读