4.php使用kafka

2022-12-25  本文已影响0人  呦丶耍脾气

本文使用docker容器操作

docker中部署kafka看下文档第二部分:2.kafka部署

$ docker network create app-tier --driver bridge
$ docker run -d --name zookeeper-server \
    --network app-tier \
    -e ALLOW_ANONYMOUS_LOGIN=yes \
    bitnami/zookeeper:latest
$ docker run -d --name kafka-server \
    --network app-tier \
    -e ALLOW_PLAINTEXT_LISTENER=yes \
    -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 \
    bitnami/kafka:latest
$ docker exec -it kafka-server /bin/bash
$ cd /opt/bitnami/kafka
#创建一个topic
$ /opt/bitnami/kafka$ bin/kafka-topics.sh --create --name test --bootstrap-server localhost:9092
#查看主题
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
#test

运行容器如下:


docker exec -it php /bin/bash
#git命令最好在phg的和宿主机的映射目录执行
git clone https://github.com/edenhill/librdkafka.git
 ./configure
 make
make install

然后安装扩展 rdkafka
pecl install rdkafka

php.ini配置文件写入
extension=rdkafka.so
php -m |grep kafka
docker exec -it kafka-server /bin/bash
cd /opt/bitnami/kafka
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
#producer.php
<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', '192.168.172.131:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test");
for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    $producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
    throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
#consumer.php
<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("192.168.172.131");
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'broker');
$topicConf->set('auto.offset.reset', 'earliest');
$topic = $rk->newTopic("test", $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
    $message = $topic->consume(0, 120*10000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}
php producer.php
#consumer执行后会一直监听,当有新的生产进来后仍会打印
php consumer.php 
上一篇下一篇

猜你喜欢

热点阅读