声明:本站文章均为作者个人原创,图片均为实际截图。如有需要请收藏网站,禁止转载,谢谢配合!!!

本篇博客已配套视频讲解教程, 点击查看 Bilibili 视频教程



由于fastadmin用的是tp5.0版本,因此安装v1.1.6的think/queue队列

注意事项

  • 负责生产和消费的文件一定不要有任何登录鉴权,不要放在admin目录下面和api带鉴权的控制器方法,否则无法正确执行队列
  • fastadmin低版本未自带think/queue拓展,高版本自带think/queue队列,可以不用安装,直接使用

1、使用composera安装topthink/think-queue

高版本自带think/queue队列,可以不用安装,跳过此步

composer require topthink/think-queue:v1.1.6

2、配置队列连接信息

application/extra/queue.php

return [
    //'connector' => 'Sync'
    'connector' => 'redis',         // 队列驱动使用 redis 推荐, 可选 database
    'host' => '127.0.0.1',          // redis 主机地址
    'password' => 'xiaotao',             // redis 密码
    'port' => 6379,                     // redis 端口
    'select' => 2,                   // redis db 库, 建议显示指定 1-15 的数字均可,如果缓存驱动是 redis,避免和缓存驱动 select 冲突
    'timeout' => 0,                     // redis 超时时间
    'persistent' => false,              // redis 持续性,连接复用
];

3、生产端

test.php
此文件不要放在admin目录下面和api带鉴权的控制器方法

<?php

namespace app\api\controller;

use app\common\controller\Api;
use app\common\job\Test;
use think\Queue;

/**
 * 首页接口
 */
class Index extends Api
{
    protected $noNeedLogin = ['*'];
    protected $noNeedRight = ['*'];

    /**
     * 给用户发送消息
     * 生产端,将消息加入队列
     */
    public function sendMsg()
    {
        //消息内容
        $msgData = [
            'user_id' => 1,
            'time' => date('Y-m-d H:i:s'),
            'msg' => 'welcome to badianboke.com'
        ];
        //队列名称
        $queueName = 'badianbokeSendMsg';
        //加入队列
        Queue::push(Test::class, $msgData, $queueName);

    }

     public function register()
    {
        //1、用户注册成功
        //User::register(); //模拟用户注册成功

        //2、给用户发送消息
        $this->sendMsg();

        $this->success('注册成功');
    }
}

4、消费端

app\common\job\Test.php

默认调用方法是 fire 方法

<?php

namespace app\common\job;
use think\Db;
use think\Exception;
use think\Log;
use think\queue\Job;

class Test
{
    public function fire(Job $job, $data){
        try {
            //TODO
            Log::info('开始发送消息:' . json_encode($data));
            //1、给用户发送消息
            $flag = $this->insertMsg($data);

            if ($flag){
                //2、发送完成后 删除job
                $job->delete();
            }else{
                //任务轮询4次后删除
                if ($job->attempts() > 3) {
                    // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行
                    //$job->release(10);
                    // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数
                    //$job->failed();
                    // 第3种处理方式:删除任务
                    $job->delete();
                }
            }



        } catch (Exception $e) {
            // 队列执行失败
            Log::error('发送消息队列执行失败:' . json_encode($data));
        }
    }

    // 消息队列执行失败后会自动执行该方法
    public function failed($data)
    {
        Log::error('消息队列达到最大重复执行次数后失败:' . json_encode($data));
    }

    public function insertMsg($data){
       $result = Db::name('msg')->insert([
            'user_id' => $data['user_id'],
            'msg' => $data['msg'],
            'time' => $data['time'],
        ]);
        return $result == 1;
    }
}

5、运行

默认队列

php think queue:listen //监听 开发环境用
php think queue:work //只执行一次
nohup php think queue:work --daemon //守护进程,多次执行

指定队列 badianQueue

php think queue:listen --queue badianbokeSendMsg//监听 开发环境用
php think queue:work  --queue badianbokeSendMsg//只执行一次
nohup php think queue:work  --queue badianbokeSendMsg --daemon //守护进程,多次执行