在PHP中使用Kafka

2023-07-17  本文已影响0人  编程放大镜

最近项目中需要从Kafka中读取消息,记录一下。

安装扩展

https://l1905.github.io/kafka/php/2020/07/07/php-user-kafka/
apt install librdkafka-dev
pecl install rdkafka

消费(从指定的 partition)

conf = new \RdKafka\Conf();conf->set

librdkafka 配置项说明
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

// 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。
$conf->set('group.id', 'myConsumerGroup1');

//添加 kafka集群服务器地址
$conf->set('metadata.broker.list', '192.168.33.1:9092');

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
//当没有初始偏移量时,从哪里开始读取
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
conf->setDefaultTopicConf(topicConf);

        $conf = new \RdKafka\Conf();
        $conf->set('group.id','ali_icebox_refund');
        $rk = new \RdKafka\Consumer($conf);
        $rk->addBrokers(\Cake\Core\Configure::read('ubox_kafka_service'));

        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set('auto.commit.interval.ms', 100);
        $topicConf->set('auto.offset.reset', 'smallest');

        $topic = $rk->newTopic('ali_icebox_refund', $topicConf);
        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while(true) {
            // 设置消费时的时间间隔,单位毫秒,以下表示60秒消费一个
            $message = $topic->consume(0, 5000);
            if ($message) {
                echo "读取到消息\n\r";

                // 消息对象,包括消息主题,消息创建时间戳,消息分区编号,消息主体,消息键名,消息长度等
                var_dump($message);

                switch ($message->err) {
                    case RD_KAFKA_RESP_ERR_NO_ERROR:
                        echo "读取消息成功:\n\r";
                        var_dump($message->payload);
                        break;
                    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                        echo "读取消息失败\n\r";
                        break;
                    case RD_KAFKA_RESP_ERR__TIMED_OUT:
                        echo "请求超时\n\r";
                        break;
                    default:
                        throw new \Exception($message->errstr(), $message->err);
                        break;
                }
            } else {
                echo "未读取到消息\n\r";
            }

关于消费

低级消费

需要必备的字段 :

必须指定group.id和partition, 因为消费偏移量是 和二者直接关联

重要的参数设置:

常见问题:

  1. 高级消费和低级消费的区别?
    高级消费即不需要指定消费的分区, sdk自动帮你选择消费的分区

  2. 希望重新消费数据,需要怎么操作
    可以重设消费偏移量 lowLevelConsume方法的option参数传递 consume_start_offset.

    • RD_KAFKA_OFFSET_BEGINNING 从最开始开始消费
    • RD_KAFKA_OFFSET_STORED 从上次消费的位置开始消费
    • RD_KAFKA_OFFSET_END 从最新位置开始消费
    • rd_kafka_offset_tail(xxx) 从最新偏移量的某个位置开始消费
  3. 是否支持批量消费.
    sdk支持,但我们的调用方法没有封装, 具体使用参考

高级消费

需要必备的字段 :

重要的参数设置:

和低级消费参数一致

上一篇下一篇

猜你喜欢

热点阅读