AMQP组件

    1. return [
    2. 'default' => [
    3. 'host' => 'localhost',
    4. 'port' => 5672,
    5. 'user' => 'guest',
    6. 'password' => 'guest',
    7. 'vhost' => '/',
    8. 'concurrent' => [
    9. 'limit' => 1,
    10. ],
    11. 'pool' => [
    12. 'min_connections' => 1,
    13. 'max_connections' => 10,
    14. 'connect_timeout' => 10.0,
    15. 'wait_timeout' => 3.0,
    16. ],
    17. 'params' => [
    18. 'insist' => false,
    19. 'login_method' => 'AMQPLAIN',
    20. 'login_response' => null,
    21. 'locale' => 'en_US',
    22. 'connection_timeout' => 3.0,
    23. 'read_write_timeout' => 3.0,
    24. 'context' => null,
    25. 'keepalive' => false,
    26. 'heartbeat' => 0,
    27. ],
    28. ],
    29. 'pool2' => [
    30. ...
    31. ];

    可在 producer 或者 consumer__construct 函数中, 设置不同 pool.

    使用 gen:producer 命令创建一个 producer

    1. <?php
    2. declare(strict_types=1);
    3. namespace App\Amqp\Producers;
    4. use Hyperf\Amqp\Annotation\Producer;
    5. use Hyperf\Amqp\Message\ProducerMessage;
    6. use App\Models\User;
    7. /**
    8. * DemoProducer
    9. * @Producer(exchange="hyperf", routingKey="hyperf")
    10. */
    11. class DemoProducer extends ProducerMessage
    12. {
    13. public function __construct($id)
    14. // 设置不同 pool
    15. $this->poolName = 'pool2';
    16. $user = User::where('id', $id)->first();
    17. $this->payload = [
    18. 'id' => $id,
    19. 'data' => $user->toArray()
    20. ];
    21. }
    22. }

    通过 DI Container 获取 Hyperf\Amqp\Producer 实例,即可投递消息。以下实例直接使用 ApplicationContext 获取 Hyperf\Amqp\Producer 其实并不合理,DI Container 具体使用请到 依赖注入 章节中查看。

    php bin/hyperf.php gen:amqp-consumer DemoConsumer
    

    在 DemoConsumer 文件中,我们可以修改 @Consumer 注解对应的字段来替换对应的 exchangeroutingKeyqueue。其中 $data 就是解析后的消息数据。示例如下。

    使用 @Consumer 注解时需 use Hyperf\Amqp\Annotation\Consumer; 命名空间;

    框架会根据 Consumer 内的 consume 方法所返回的结果来决定该消息的响应行为,共有 4 中响应结果,分别为 \Hyperf\Amqp\Result::ACK\Hyperf\Amqp\Result::NACK\Hyperf\Amqp\Result::REQUEUE\Hyperf\Amqp\Result::DROP,每个返回值分别代表如下行为:

    返回值 行为
    \Hyperf\Amqp\Result::ACK 确认消息正确被消费掉了
    \Hyperf\Amqp\Result::NACK 消息没有被正确消费掉,以 basic_nack 方法来响应
    \Hyperf\Amqp\Result::REQUEUE 消息没有被正确消费掉,以 方法来响应,并使消息重新入列
    \Hyperf\Amqp\Result::DROP 消息没有被正确消费掉,以 basic_reject 方法来响应