基于thinkphp5、swoole和easywechat微信模

2020-11-25  本文已影响0人  雁渡寒潭丶

问题

微信消息推送万级、十万百万级用传统遍历的方法就很鸡肋了,而且很容易造成内存占满和502崩溃,所以要用异步多线程处理。我们在业务中很常见,经常需要发送一些活动或者开课提醒之类的一次性推送给微信公众号粉丝。用传统方法很费人力而且操作不当容易造成重发或者漏发,这样很容易流失粉丝。而网上的一些第三方平台有这方面的服务基本上都是按照粉丝量按年收费。我之前也试用过,也只能一次性给你发一两千人。一年最低的粉丝量费用要880。所以还是自己硬着头皮给它搞出来!经过一番折腾,终于搞出来了!

环境需要

1.php安装swoole扩展。自行百度,必须是linux环境,如果是使用的宝塔直接在商店所安装的php设置里安装swoole扩展。同时放行9501端口号
2.composer安装easywechat。很好用的一款微信相关的sdk。
3.利用thinkphp5或者以上版本自定义command来处理。方便之后做定时任务处理:at或crontab来处理任务。at是在特定时间只执行一次;crontab是每隔多久执行一次。

代码实现

<?php

namespace app\command;

use app\admin\model\WxOpenid;
use app\common\command\Base;
use Swoole\Client as SwooleClient;
use think\console\Input;
use think\console\Output;
use think\console\input\Argument;
use think\console\input\Option;
use libs\Log;
use EasyWeChat\Factory;
use Swoole\Server as SwooleServer;

class Message extends Base
{
    /**
     * 配置参数
     */
    const TEST_OPENID = '测试openid';
    const TEMPLATE_ID = '模板id';
    const URL = '跳转链接';
    const TEMPLATE_DATA = [
        'first' => '内容1',
        'keyword1' => '内容2',
        'keyword2' => '全体用户',
        'remark' => ['value' => '点击此条消息进入学习', 'color' => '#FF0000'],
    ];
    const APP_ID = "微信APP_ID";
    const APP_SECRET = "微信APP_SECRET";


    const SWOOLE_TESK = [
        'task_worker_num' => 480,
        'reactor_num' => 24,
        'worker_num' => 24
    ];
    protected $app;

    protected function configure()
    {
        $this->setName('msg')
            ->addOption('test', 't', Option::VALUE_NONE, '测试方法')
            ->addOption('client', 'c', Option::VALUE_NONE, 'swoole客户端')
            ->addOption('sync', 's', Option::VALUE_NONE, 'openid同步到库')
            ->addOption('send', 'e', Option::VALUE_NONE, '发送模板消息')
            ->addOption('required', null, Option::VALUE_REQUIRED, 'this is a value_required option')
            ->addOption('optional', null, Option::VALUE_OPTIONAL, 'this is a value_optional option')
            ->addArgument('optional', Argument::OPTIONAL, "argument::optional")
            ->setDescription('用于xxx模板消息发送');
    }

    protected function execute(Input $input, Output $output)
    {
        $options = array_filter($input->getOptions(true));
        if (empty($options)) {
            return $output->error('please enter options ^_^');
        }

        $config = [
            'app_id' => self::APP_ID,
            'secret' => self::APP_SECRET,
            'log' => [
                'default' => 'prod', // 默认使用的 channel,生产环境可以改为下面的 prod
                'channels' => [
                    // 测试环境
                    'dev' => [
                        'driver' => 'single',
                        'path' => '/tmp/easywechat.log',
                        'level' => 'debug',
                    ],
                    // 生产环境
                    'prod' => [
                        'driver' => 'daily',
                        'path' => '/runtime/log/easywechat.log',
                        'level' => 'info',
                    ],
                ],
            ],
        ];
        $this->app = Factory::officialAccount($config);

        try {
            $input->getOption('client') && $this->client();
            $input->getOption('test') && $this->test();
            $input->getOption('sync') && $this->sync();
            $input->getOption('send') && $this->send();
        } catch (\Exception $ex) {
            return Log::err(__METHOD__, $options, $ex->getMessage());
        }
    }

    protected function test()
    {
        // https://mp.weixin.qq.com/s/zXnuWR58oXulsrO7wLkeaw
        $params = [
            'touser' => self::TEST_OPENID,
            'template_id' => self::TEMPLATE_ID,
            'data' => self::TEMPLATE_DATA,
            'url' => self::URL,
        ];
        $res = $this->app->template_message->send($params);
        dd($res);
    }

    protected function client()
    {
        $client = new SwooleClient(SWOOLE_SOCK_TCP);
        if (!$client->connect('127.0.0.1', 9501, 0.5)) {
            exit("connect failed. Error: {$client->errCode}\n");
        }
        $msg = 'Task begin...';
        $this->startTime = microtime(true);
        $this->output->info($msg);
        //向服务器发送数据
        $client->send($msg);
    }

    protected function send()
    {
        $swoole = new SwooleServer("127.0.0.1", 9501);
        $swoole->set(self::SWOOLE_TESK);
        $swoole->on('receive', function ($swoole, $fd, $from_id, $data) {
            db('wx_openid')->where('num', 0)
                ->chunk(100, function ($data) use ($swoole) {
                    $taskId = $swoole->task($data);
                    $this->output->info("Task_id {$taskId} Delivering successful");
                });
        });

        //处理异步任务
        $swoole->on('task', function ($swoole, $task_id, $from_id, $data) {
            foreach ($data as $item) {
                $params = [
                    'touser' => $item['openid'],
                    'template_id' => self::TEMPLATE_ID,
                    'data' => self::TEMPLATE_DATA,
                    'url' => self::URL
                ];
                $wxopenid = WxOpenid::get(['openid' => $item['openid']]);
                try {
                    $this->app->template_message->send($params);
                } catch (\Exception $ex) {
                    Log::err('模板消息发送错误', $params);
                    $wxopenid->num -= 1;
                    $wxopenid->save();
                    continue;
                }
                $wxopenid->num += 1;
                $wxopenid->save();
            }
            $swoole->finish(count($data) . " -> OK");
        });

        //处理异步任务的结果
        $swoole->on('finish', function ($swoole, $task_id, $data) {
            $this->output->info("AsyncTask[$task_id] Finish: $data");
        });

        $swoole->start();
    }

    // 503289
    protected function sync($nextOpenId = null)
    {
        $res = $this->app->user->list($nextOpenId);
        $data = [];
        foreach ($res['data']['openid'] as $item) {
            $data[] = [
                'openid' => $item,
                'create_time' => NOW
            ];
        }
        WxOpenid::insertAll($data);
        if ($res['next_openid']) {
            $this->sync($res['next_openid']);
        }
    }
}

运行步骤

1.sync同步openid到数据库。因为微信每日调用用户列表等接口有次数限制,所以本着即开即食的原则,放在库里操作。
2.send起服务
3.client触发服务。

注意事项

1.必须在项目的根目录来运行php think msg命令
2.同时开始两个窗口,先执行php think msg -e,然后另一个窗口执行client。后期如果做成可视化的界面,创建定时任务的时候可以/usr/local/php/bin或(宝塔)/www/serve/php/72/bin server &&client来执行。

ps:如果有任何问题或看不懂的地方私信我,一起学习进步!

上一篇下一篇

猜你喜欢

热点阅读