日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

本篇文章給大家帶來了關于thinkphp的相關知識,其中主要介紹了關于使用think-queue來實現普通隊列延遲隊列的相關內容,think-queue是thinkphp官方提供的一個消息隊列服務,下面一起來看一下,希望對大家有幫助。

一起聊聊thinkphp6使用think-queue實現普通隊列和延遲隊列

###TP6 隊列

TP6 中使用 think-queue 可以實現普通隊列和延遲隊列。

think-queue 是thinkphp 官方提供的一個消息隊列服務,它支持消息隊列的一些基本特性:

消息的發布,獲取,執行,刪除,重發,失敗處理,延遲執行,超時控制等

隊列的多隊列, 內存限制 ,啟動,停止,守護等

消息隊列可降級為同步執行

消息隊列實現過程

1、通過生產者推送消息到消息隊列服務中

2、消息隊列服務將收到的消息存入redis隊列中(zset)

3、消費者進行監聽隊列,當監聽到隊列有新的消息時,獲取隊列第一條

4、處理獲取下來的消息調用業務類進行處理相關業務

5、業務處理后,需要從隊列中刪除消息


composer 安裝 think-queue

composer require topthink/think-queue

配置文件

安裝完 think-queue 后會在 config 目錄中生成 queue.php,這個文件是隊列的配置文件。

tp6中提供了多種消息隊列的實現方式,默認使用sync,我這里選擇使用Redis。

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => env('redis.host', '127.0.0.1'),
            'port'       => env('redis.port', '6379'),
            'password'   => env('redis.password','123456'),
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

創建目錄及隊列消費類文件

在 app 目錄下創建 queue 目錄,然后在該目錄下新建一個抽象類 Queue.php 文件,作為基礎類

<?php
namespace app\queue;
use think\facade\Cache;
use think\queue\Job;
use think\facade\Log;
/**
 * Class Queue 隊列消費基礎類
 * @package app\queue
 */
abstract class Queue
{
    /**
     * @describe:fire是消息隊列默認調用的方法
     * @param \think\queue\Job $job
     * @param $message
     */
    public function fire(Job $job, $data)
    {
        if (empty($data)) {
            Log::error(sprintf('[%s][%s] 隊列無消息', __CLASS__, __FUNCTION__));
            return ;
        }
 
        $jobId = $job->getJobId(); // 隊列的數據庫id或者redis key
        // $jobClassName = $job->getName(); // 隊列對象類
        // $queueName = $job->getQueue(); // 隊列名稱
 
        // 如果已經執行中或者執行完成就不再執行了
        if (!$this->checkJob($jobId, $data)) {
            $job->delete();
            Cache::store('redis')->delete($jobId);
            return ;
        }
 
        // 執行業務處理
        if ($this->execute($data)) {
            Log::record(sprintf('[%s][%s] 隊列執行成功', __CLASS__, __FUNCTION__));
            $job->delete(); // 任務執行成功后刪除
            Cache::store('redis')->delete($jobId); // 刪除redis中的緩存
        } else {
            // 檢查任務重試次數
            if ($job->attempts() > 3) {
                Log::error(sprintf('[%s][%s] 隊列執行重試次數超過3次,執行失敗', __CLASS__, __FUNCTION__));
                 // 第1種處理方式:重新發布任務,該任務延遲10秒后再執行;也可以不指定秒數立即執行
                //$job->release(10); 
                // 第2種處理方式:原任務的基礎上1分鐘執行一次并增加嘗試次數
                //$job->failed();   
                // 第3種處理方式:刪除任務
                $job->delete(); // 任務執行后刪除
                Cache::store('redis')->delete($jobId); // 刪除redis中的緩存
            }
        }
    }
 
    /**
     * 消息在到達消費者時可能已經不需要執行了
     * @param  string  $jobId
     * @param $message
     * @return bool 任務執行的結果
     * @throws \Psr\SimpleCache\InvalidArgumentException
     */
    protected function checkJob(string $jobId, $message): bool
    {
        // 查詢redis
        $data = Cache::store('redis')->get($jobId);
        if (!empty($data)) {
            return false;
        }
        Cache::store('redis')->set($jobId, $message);
        return true;
    }
 
    /**
     * @describe: 根據消息中的數據進行實際的業務處理
     * @param $data 數據
     * @return bool 返回結果
     */
    abstract protected function execute($data): bool;
}

所有真正的消費類繼承基礎抽象類

<?php
namespace app\queue\test;
use app\queue\Queue;
class Test extends Queue
{
    protected function execute($data): bool
    {
       // 具體消費業務邏輯
    }
}

生產者邏輯

use think\facade\Queue;
 
// 普通隊列生成調用方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);
 
// 延時隊列生成調用方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延時隊列 10 秒后執行:
Queue::later(10 , Test::class, $data, $queueName);

開啟進程監聽任務并執行

php think queue:listen
php think queue:work


命令模式介紹

命令模式

queue:work 命令

work 命令: 該命令將啟動一個 work 進程來處理消息隊列。

php think queue:work --queue TestQueue

queue:listen 命令

listen 命令: 該命令將會創建一個 listen 父進程 ,然后由父進程通過 proc_open(‘php think queue:work’) 的方式來創建一個work 子 進程來處理消息隊列,且限制該work進程的執行時間。

php think queue:listen --queue TestQueue


命令行參數

Work 模式

php think queue:work \
--daemon            //是否循環執行,如果不加該參數,則該命令處理完下一個消息就退出
--queue  helloJobQueue  //要處理的隊列的名稱
--delay  0 \        //如果本次任務執行拋出異常且任務未被刪除時,設置其下次執行前延遲多少秒,默認為0
--force  \          //系統處于維護狀態時是否仍然處理任務,并未找到相關說明
--memory 128 \      //該進程允許使用的內存上限,以 M 為單位
--sleep  3 \        //如果隊列中無任務,則sleep多少秒后重新檢查(work+daemon模式)或者退出(listen或非daemon模式)
--tries  2          //如果任務已經超過嘗試次數上限,則觸發‘任務嘗試次數超限’事件,默認為0

Listen 模式

php think queue:listen \
--queue  helloJobQueue \   //監聽的隊列的名稱
--delay  0 \         //如果本次任務執行拋出異常且任務未被刪除時,設置其下次執行前延遲多少秒,默認為0
--memory 128 \       //該進程允許使用的內存上限,以 M 為單位
--sleep  3 \         //如果隊列中無任務,則多長時間后重新檢查,daemon模式下有效
--tries  0 \         //如果任務已經超過重發次數上限,則進入失敗處理邏輯,默認為0
--timeout 60         //創建的work子進程的允許執行的最長時間,以秒為單位

可以看到 listen 模式下,不包含 --deamon 參數,原因下面會說明

消息隊列的開始,停止與重啟

開始一個消息隊列:

php think queue:work

停止所有的消息隊列:

php think queue:restart

重啟所有的消息隊列:

php think queue:restart 
php think queue:work


分享到:
標簽:thinkphp6 think-queue thinkphp普通隊列 thinkphp延遲隊列
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定