4.php使用kafka
2022-12-25 本文已影响0人
呦丶耍脾气
本文使用docker容器操作
docker中部署kafka看下文档第二部分:2.kafka部署
$ docker network create app-tier --driver bridge
$ docker run -d --name zookeeper-server \
--network app-tier \
-e ALLOW_ANONYMOUS_LOGIN=yes \
bitnami/zookeeper:latest
$ docker run -d --name kafka-server \
--network app-tier \
-e ALLOW_PLAINTEXT_LISTENER=yes \
-e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 \
bitnami/kafka:latest
$ docker exec -it kafka-server /bin/bash
$ cd /opt/bitnami/kafka
#创建一个topic
$ /opt/bitnami/kafka$ bin/kafka-topics.sh --create --name test --bootstrap-server localhost:9092
#查看主题
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
#test
运行容器如下:
- 进入php容器执行
docker exec -it php /bin/bash
#git命令最好在phg的和宿主机的映射目录执行
git clone https://github.com/edenhill/librdkafka.git
./configure
make
make install
然后安装扩展 rdkafka
pecl install rdkafka
php.ini配置文件写入
extension=rdkafka.so
- 查看是否生效
php -m |grep kafka
- 进入kafka容器,创建一个topic:test
docker exec -it kafka-server /bin/bash
cd /opt/bitnami/kafka
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
- 进入php容器,在运行目录下创建
producer.php
和consumer.php
#producer.php
<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', '192.168.172.131:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
#consumer.php
<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("192.168.172.131");
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'broker');
$topicConf->set('auto.offset.reset', 'earliest');
$topic = $rk->newTopic("test", $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 120*10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
- 执行php
php producer.php
#consumer执行后会一直监听,当有新的生产进来后仍会打印
php consumer.php