swoole结合kafka实现超高性能消息队列

2019-12-31  本文已影响0人  Bing的天涯路

使用kafka结合easyswoole异步定时任务已经多进程来实现一个高性能的消息队列服务,主要用来实现飞飞物联的设备逻辑(规则引擎),比如根据传感器的数据发短信等等。

首先连接kafka,这里的kaka我使用的百度云提供的kafka服务,自己部署太麻烦而且难以维护,连接的参考例子在这里https://github.com/BCEBIGDATA/kafka-sample-php ,其实最想使用的是微博的那个不使用扩展来连接kafka的库,但是一直没有解决使用ssl文件连接的问题,因此就是用了rdkafka扩展,首先按照样例中的说明安装librdkafka

sh setup-librdkafka.sh
pecl install rdkafka
echo "extension=rdkafka.so" >> /etc/php.ini //根据实际位置

这样就安装好了librdkafka和php扩展,要注意的是版本号必须要新一些的,否则使用ssl的会报没有该设置项的异常,排查这个异常花了一晚上的时间。

接下来在easyswoole创建一个连接kafka的基类,在飞飞物联的项目中只会使用到consumer,因为producer的数据是来自天工的设备数据

kafka.php – 连接kafka的基类

namespace App\Lib\Kafka;
 
use \RdKafka\Conf;
use \RdKafka\KafkaConsumer;
use Swoole\Exception;
 
 
class Kafka
{
    private $topic = '';
 
    private $config = [
        'broker' => 'xxxxxxxxx:9092',
        'security_protocol' => 'ssl',
        'client_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.pem',
        'client_key' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.key',
        'ca_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/ca.pem',
        'group_id' => 'kafka-feifei-swoole-consumer'
    ];
 
    public function __construct($topic)
    {
        if(!extension_loaded(rdkafka)){
            throw new Exception('rdkafka.so扩展必须开启');
        }
 
        if(!isset($topic) || empty($topic)){
            throw new Exception('kafak实例化必须设置topic');
        }
        $this->topic = $topic;
    }
 
    public function subscribe(){
        $conf = new \RdKafka\Conf();
        $conf->set('metadata.broker.list', $this->config['broker']);
        $conf->set('group.id', $this->config['group_id'].rand(0,10));
        $conf->set('security.protocol', $this->config['security_protocol']);
        $conf->set('ssl.certificate.location', $this->config['client_pem']);
        $conf->set('ssl.key.location', $this->config['client_key']);
        $conf->set('ssl.ca.location', $this->config['ca_pem']);
        $consumer = new \RdKafka\KafkaConsumer($conf);
        $consumer->subscribe([$this->topic]);
        return $consumer;
    }
 
}

这里需要特别注意的是PHPstorm的代码检查器好像找不到rdkafka这个扩展,但是没有关系,我没可以在初始化这个类的时候判断一下扩展是否存在。这里只实现了消费者,要使用消费者需要实例化的时候传入消费者的topic,然后调用subscribe方法,接下来实际在easyswoole的mainServiceCreate中创建三个进程来处理kafka的订阅事件

public static function mainServerCreate(EventRegister $register)
{
    // TODO: Implement mainServerCreate() method.
//    注册Kafka消费事件, 开三个进程来处理
    $allNum = 3;
    for($i = 0; $i < $allNum; $i++){
        ServerManager::getInstance()->getSwooleServer()->addProcess((new Consumer("consumer_{$i}"))->getProcess());
    }
}

这里new的Consumer就是处理消费的进程

Consumer.php

<?php
/**
 * Created by bingxiong.
 * Date: 4/8/19
 * Time: 10:22 PM
 * Description:
 */
 
namespace App\Lib\Kafka;
use EasySwoole\Component\Process\AbstractProcess;
 
class Consumer extends AbstractProcess
{
    private $isRun = false;
    public function run($arg)
    {
 
        // 在这里处理kafka连接
        // TODO: Implement run() method.
        $this->addTick(500,function (){
            if(!$this->isRun){
                $this->isRun = true;
                // 连接kafka并订阅TOPIC
                $kafka = new Kafka('xxxxxxxxxxxx');//topic
                $consumer = $kafka->subscribe();
                while(true){
                    try{
                        $message = $consumer->consume(120*1000);
                        if($message){
                            switch ($message->err) {
                                case RD_KAFKA_RESP_ERR_NO_ERROR:
                                    echo 'process name is'.$this->getProcessName().'\n';
                                    echo "partition:", $message->partition,", offset:", $message->offset,", ", $message->payload, "\n";
                                    break;
                                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                                    echo "No more messages; will wait for more\n";
                                    break;
                                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                                    echo "Timed out\n";
                                    break;
                                default:
                                    throw new \Exception($message->errstr(), $message->err);
                                    break;
                            }
                        }else{
                            break;
                        }
                    }catch (\Throwable $throwable){
                        break;
                    }
                }
 
                $this->isRun = false;
            }
            var_dump($this->getProcessName().' task run check');
        });
    }
 
    public function onShutDown()
    {
        // TODO: Implement onShutDown() method.
    }
 
    public function onReceive(string $str, ...$args)
    {
        // TODO: Implement onReceive() method.
    }
}

这里使用了一个异步任务addTick,如果长期没有消息的话也会每500秒去检查一下有没有新的消息。这里还是用了一个死循环,在这里死循环中持续处理消息过来之后的逻辑


1.png

现在已经使用swoole+kafka拿到设备的数据了,接下来就是使用异步任务或者异步redis之类的去执行相应的业务逻辑了。

本文作者熊冰,个人网站Bing的天涯路,转载请注明出处。

上一篇 下一篇

猜你喜欢

热点阅读