异步队列

    配置文件位于 config/autoload/async_queue.php,如文件不存在可自行创建。

    1. <?php
    2. return [
    3. 'default' => [
    4. 'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
    5. 'channel' => 'queue',
    6. 'timeout' => 2,
    7. 'retry_seconds' => 5,
    8. 'handle_timeout' => 10,
    9. 'processes' => 1,
    10. 'concurrent' => [
    11. 'limit' => 5,
    12. ],
    13. ],
    14. ];
    1. return [
    2. 'default' => [
    3. 'channel' => 'queue',
    4. 'retry_seconds' => [1, 5, 10, 20],
    5. 'processes' => 1,
    6. ],
    7. ];

    组件已经提供了默认子进程,只需要将它配置到 config/autoload/processes.php 中即可。

    当然,您也可以将以下 Process 添加到自己的项目中。

    1. <?php
    2. declare(strict_types=1);
    3. namespace App\Process;
    4. use Hyperf\AsyncQueue\Process\ConsumerProcess;
    5. use Hyperf\Process\Annotation\Process;
    6. /**
    7. * @Process(name="async-queue")
    8. */
    9. class AsyncQueueConsumer extends ConsumerProcess
    10. {

    生产消息

    传统方式

    1. <?php
    2. declare(strict_types=1);
    3. namespace App\Job;
    4. use Hyperf\AsyncQueue\Job;
    5. class ExampleJob extends Job
    6. {
    7. public $params;
    8. public function __construct($params)
    9. {
    10. // 这里最好是普通数据,不要使用携带 IO 的对象,比如 PDO 对象
    11. $this->params = $params;
    12. }
    13. public function handle()
    14. {
    15. // 根据参数处理具体逻辑
    16. var_dump($this->params);
    17. }

    生产消息

    注解方式

    框架除了传统方式投递消息,还提供了注解方式。

    <?php
    
    declare(strict_types=1);
    
    namespace App\Service;
    
    use Hyperf\AsyncQueue\Annotation\AsyncQueueMessage;
    
    class QueueService
    {
        /**
         * @AsyncQueueMessage
         */
        public function example($params)
        {
            // 需要异步执行的代码逻辑
            var_dump($params);
        }
    }
    

    投递消息

    根据实际业务场景,动态投递消息到异步队列执行,我们演示在控制器动态投递消息,如下:

    <?php
    
    declare(strict_types=1);
    
    namespace App\Controller;
    
    use App\Service\QueueService;
    use Hyperf\Di\Annotation\Inject;
    use Hyperf\HttpServer\Annotation\AutoController;
    
    /**
     * @AutoController
     */
    class QueueController extends Controller
    {
        /**
         * @Inject
         * @var QueueService
         */
        protected $service;
    
        public function index()
        {
            $this->service->push([
                'group@hyperf.io',
                'https://doc.hyperf.io',
                'https://www.hyperf.io',
            ]);
    
            return 'success';
        }
    }